ActiveMQ中消費者是如何接收訊息的(一)
事先說明,本部落格關於ActiveMQ的文章都是基於ActiveMQ5.10版本。
初步用過ActiveMQ但又沒去研究過原始碼的朋友肯定有些好奇ActiveMQ中消費者是如何接收訊息的呢?本文我就和大家一起從原始碼角度來初步探討消費者接收訊息的過程。
我們知道,訊息傳送有兩種模型:點對點(P2P)和釋出訂閱(PUB/SUB),佇列模式中,訊息生產者叫做傳送者,訊息消費者叫做接收者,而在釋出訂閱模式中,訊息生產者叫釋出者,訊息消費者叫訂閱者。點對點模型中佇列(Queue)是訊息傳送和接收的途徑和通道,他保證了一個訊息最多隻能被一個消費者消費,而釋出訂閱模型中,訊息傳送和接收的途徑是主題
1)我們先來看看在點對點模型中消費者是如何接收訊息的
如果直接使用過ActiveMQ API的朋友,一定知道訊息接收者可以通過兩種方式接收訊息,一種是使用同步效果的MessageConsumer#receive() 和非同步的使用訊息監聽器的MessageConsumer#setMessageListener(MessageListener listener) 。值得注意的是,在同一個org.apache.activemq.ActiveMQSession會話物件下面的消費者,如果有的是採用訊息監聽器接收訊息,則那些採用同步receive() 接收訊息的消費者會丟擲 IllegalStateException("Cannot synchronously receive a message when a MessageListener is set")異常,也就是說,同一個Session下面,要麼消費者都使用訊息監聽器,要麼都使用receive() 同步接收。
這是為什麼呢?我們先看下org.apache.activemq.ActiveMQMessageConsumer同步接收的原始碼:
@Override public Message receive() throws JMSException { checkClosed(); checkMessageListener(); sendPullCommand(0); // 如果預取數為0,則主動向JMS伺服器傳送拉取訊息的報文 MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); // 給JMS伺服器傳送接收訊息的應答報文 return createActiveMQMessage(md); // 取出訊息副本並返回 }
上面的checkMessageListener()就是去做檢查的,請看:
protected void checkMessageListener() throws JMSException {
// 去呼叫所屬會話的checkMessageListener();方法
session.checkMessageListener();
}
而ActiveMQSession中的原始碼如下:
public void checkMessageListener() throws JMSException {
if (messageListener != null) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
// 遍歷由會話建立的消費者中是否有繫結訊息監聽器的消費者,如果有,則拋異常。
for (Iterator i = consumers.iterator(); i.hasNext();) {
ActiveMQMessageConsumer consumer = i.next();
if (consumer.hasMessageListener()) {
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
}
}
如上所示,checkMessageListener() 呼叫的是該消費者所屬會話的checkMessageListener()方法,而會話中的checkMessageListener()方法正是去該會話下面檢視所有的消費者看看是否有采用訊息監聽的,如果有,則立馬丟擲IllegalStateException異常。至於ActiveMQ為什麼要這樣限制,第一是為了防止一個消費者同時採用同步和訊息監聽器兩種方式接收訊息,第二就是這樣導致了無法採用一致的訊息分發方式來將該會話接收到的訊息合理的分配給下面的消費者,第三就是如果是事務性會話,採用兩種方式的消費者是無法管理的。當然,如果你需要採用同步和非同步訊息接收共存,那也很簡單,你只要通過ActiveMQConnection建立兩個會話,一個會話下面建立的消費者都是採用同步接收,另一個會話下面建立的消費者都是採用非同步接收就行了。
下面,我們來看看採用receive() 的內部是如何工作的。 這裡,我們先來了解一下org.apache.activemq.ActiveMQMessageConsumer中幾個重要的成員屬性:
protected final MessageDispatchChannel unconsumedMessages;// 未消費的訊息通道,裡面用來儲存未消費的訊息,該通道容納的最大訊息數為預取值
protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();// 分發給該消費者但未應答的訊息連結串列,列表中的訊息順序和被消費的順序是相反的。
private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages; // 為了對TX的完整性進行驗證,我們需要對一個事務中的訊息重複傳送進行跟蹤。
這裡,我們先給出receive()方法的原始碼:
@Override
public Message receive() throws JMSException {
checkClosed(); // 檢查unconsumedMessages是否關閉
checkMessageListener(); // 檢查是否有其他消費者使用了訊息監聽器
sendPullCommand(0); // 向JMS提供者傳送一個拉取命令來拉取訊息,為下次消費做準備
MessageDispatch md = dequeue(-1); // 從unconsumedMessages取出一個訊息
if (md == null) {
return null;
}
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
return createActiveMQMessage(md);
}
在ActiveMQ中,通過會話建立一個消費者時,就會為這個消費者建立一個未消費的訊息通道,該通道分為兩種,如果你採用的是優先順序佇列,則建立的是SimplePriorityMessageDispatchChannel()簡單優先順序訊息分發的通道,如果不是,則建立的是FifoMessageDispatchChannel()先進先出的分發通道,如果你要問為什麼需要有這個東西,第一,消費者處理訊息是需要時間的,如果每次處理完一條訊息才告知Session我處理完了,你再給我一個,這對於快消費者來說,效率是極低的,所以你得允許Session能夠一次性將多條訊息分給一個消費者,還記得“預取consumer.prefetchSize”的特性嗎?Session將某條訊息傳送到這個消費者時,會先把訊息放入屬於這個消費者的未消費的訊息通道中,我們每呼叫一次消費者的receive() 方法,首先要做的是就是去檢查這個通道是否被關閉,如果被關閉,則會丟擲IllegalStateException("The Consumer is closed");異常,第二步才是去呼叫上面提到的方法去檢查是否有采用訊息監聽器接收訊息的其他消費者“哥們”,如果通過了這兩項檢查,接下來要做的就是非同步向MOM傳送一個pull命令訊息來拉取訊息(注意,只有在預取prefetchSize設定為0且未消費的訊息通道unconsumedMessages中已經沒訊息了才會傳送pull命令訊息,因為只有這時才需要告訴JMS提供者,消費者我已經把訊息處理完了,你得趕緊再給我發一批,當然這個命令的傳送過程是非同步的,這也是為什麼採用receive接收訊息可以設定預取為0的原因),在傳送這個命令之前,客戶端會先清理已分發訊息連結串列deliveredMessages,這一步的處理分為兩種,1.Session是非事務的,如果Session的應答模式是CLIENT_ACKNOWLEDGE,也就是需要客戶端的消費者主動呼叫Message#acknowledge()來應答MOM,由於我們這裡討論的是佇列,所以只是簡單的將deliveredMessages給清空而已(如果是基於主題的,會去遍歷deliveredMessages給每個訊息呼叫ActiveMQConnection#rollbackDuplicate做重複回滾處理);如果Session應答模式不是CLIENT_ACKNOWLEDGE,則不管是佇列還是主題,都只是清空deliveredMessages而已。2.Session是事務的,則會將遍歷deliveredMessages中的訊息放入previouslyDeliveredMessages中來為重發做準備,原始碼如下,false表示還未進行過重發。
for (MessageDispatch delivered : deliveredMessages) {
previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
}
接著,消費者就會直接從unconsumedMessages取出一個訊息,從上面的原始碼可以看出,傳入的時間毫秒引數是-1,所以表示如果unconsumedMessages為空將一直阻塞,如果想設定超時時間,可以使用如下方法同步接收訊息:
public Message receive(long timeout) throws JMSException;
timeout==0表示一點也不阻塞,直接返回,如果是大於零的值則最多阻塞設定的值的毫秒數。
阻塞取訊息這一步走完,如果獲得的訊息分發物件MessageDispatch不為空,這如上面的原始碼,將執行beforeMessageIsConsumed(md);方法,如該方法名所示,該方法主要做消費訊息前的準備工作,如果應答模式不是DUPS_OK_ACKNOWLEDGE或者是佇列模式,則將該訊息分發物件放入deliveredMessages列表的開頭;如果Session是事務的,則(這裡呆會在補充)。接下來呼叫的afterMessageIsConsumed(md, false);的主要作用是應答MOM,所以,當這個方法執行完,你就可以通過MQ的控制檯看到該訊息已經在“Messages Dequeued”中了。最後的createActiveMQMessage(md);作用就更簡單了,直接從md物件中取出訊息的副本進行返回,這樣,訊息接收者客戶端就完成了一條訊息的同步接收。
接著,我們來看看採用訊息監聽器是如何接收訊息的。 消費者可以呼叫public void setMessageListener(MessageListener listener) throws JMSException;方法來給自己設定一個訊息監聽器,下面給出原始碼:
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
checkClosed();
if (info.getPrefetchSize() == 0) {
throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1");
}
if (listener != null) {
boolean wasRunning = session.isRunning();
if (wasRunning) {
session.stop();
}
this.messageListener.set(listener);
session.redispatch(this, unconsumedMessages);
if (wasRunning) {
session.start();
}
} else {
this.messageListener.set(null);
}
}
注意看加粗部分程式碼,可以看出,採用訊息監聽器接收訊息的消費者,預取數必須大於0,JMS給出的說法是非同步消費者不支援。我們來一行行分析程式碼,該方法首先的工作和採用同步接收訊息的方法一樣去檢查unconsumedMessages是否關閉,如果沒有關閉,且listener不為空,則看會話Session是否已經Running,在ActiveMQSession中,有一個叫started的AtomicBoolean,他在Session呼叫自己的啟動方法start()方法時會設定成true,而session.isRunning()方法返回的正是此值,下面給出start()方法的原始碼:
protected void start() throws JMSException {
started.set(true);
for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
ActiveMQMessageConsumer c = iter.next();
c.start();
}
executor.start();
}
可以看出,該方法不是公用的,因為預設是在ActiveMQSession建構函式中呼叫的:
if (connection.isStarted()) {
start();
}
有人會感到奇怪,我在通過ActiveMQConnection建立ActiveMQSession之前並沒有呼叫ActiveMQConnection的start()方法啊,所以Session的建構函式裡面也並沒有啟動Session自己啊?不用著急,因為你隨後呼叫的ActiveMQConnection的start()方法裡面也會去呼叫Session的start()方法,原始碼如下:
@Override
public void start() throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = i.next();
session.start();
}
}
}
Connection在啟動時會主動去遍歷其下建立的Session,挨個讓Session啟動。經常使用JMSAPI的人應該知道,如果Connection沒有呼叫start()方法時,即使佇列中有訊息,該Connection下面的消費者都是無法獲取到該訊息的(發訊息不同,即使Connection沒有啟動,訊息傳送者仍然可以傳送訊息到JMS伺服器),這下你們都知道原因了吧。好,迴歸正題,如果發現Session已經啟動,它會主動去“關閉”該會話,這是當然的,ActiveMQ得保證該會話下面所有消費者都做好訊息接收準備工作再啟動自己。所以,如果我們直接使用ActiveMQ的API,最好是所有工作都做好後,再去呼叫ActiveMQConnection的start()方法。再保證了此時Session沒有啟動後,很顯然我們得儲存這個listener,因為我們後面還會去呼叫它。接著是session.redispatch(this, unconsumedMessages);,這是去消費該消費者unconsumedMessages中遺留的訊息並將unconsumedMessages清空,因為我們是新建立的消費者,所以這一步就根本什麼也沒做。接著,如果Session是剛開始是啟動的,由於剛才我們關閉過,所以我們會再次去啟動它。這樣,設定訊息監聽器的工作就作完了。