1. 程式人生 > >rabbitMq消費者角度:訊息分發、訊息應答(ACK)、公平分發

rabbitMq消費者角度:訊息分發、訊息應答(ACK)、公平分發

rabbitMq交換機簡介中介紹了rabbitMq的四種類型交換機;rabbitMq生產者角度一篇從生產者角度介紹如何保證訊息被正確傳送到伺服器,如果未正確傳送如何處理;本篇部落格將從消費者角度介紹三個問題:佇列分發訊息到消費者的規則、如何確保訊息一定被正確接受並處理了、如何保證多個消費者負載均衡。

一 、訊息分發

官網的示例中介紹了預設情況下rabbitMqRabbitMQ會一個一個的傳送資訊給下一個消費者(consumer),而不考慮每個任務的時長等等,且是一次性分配,並非一個一個分配。平均的每個消費者將會獲得相等數量的訊息。這樣分發訊息的方式叫做round-robin。這可以表示為下圖:
roud-robin

二、訊息應答-保證訊息被正確接受並處理

結合上面的分發圖說明,預設情況下消費者C1接收到訊息1無論是否正常接受和處理都會立即應答rabbit伺服器,然後訊息1就會從佇列中被刪除,假如C1突然出現異常狀況導致訊息1沒有被處理完畢,那麼訊息1就處理失敗了,也不會有其他消費者去處理訊息1。事實上我們希望的是訊息1如果沒有被C1正確處理完畢,那麼就傳送給其他消費者處理,為了達到這個目的,只需要做兩件事情,第一關閉rabbitMq的自動應答機制,第二消費者正確處理完訊息後手動應答。

核心程式碼展示

        //第二個引數autoAck設定成false表示關閉自動應答
        channel.basicConsume(QUEUE_NAME, false
, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(hashCode + " [x] Received '" + message + "'"); doWork(message); System.out.println(hashCode + " [x] Done"
); //手動應答,第二個引數multiple表示是否批量應答,很明顯現在不是批量應答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false ); }

三、消費者負載均衡

預設情況下rabitMq會把佇列裡面的訊息立即傳送到消費者,無論該消費者有多少訊息沒有應答,也就是說即使發現消費者來不及處理,新的消費者加入進來也沒有辦法處理已經堆積的訊息,因為那些訊息已經被髮送給老消費者了。
chanel.basicQos(prefetchCount)
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個訊息,即一旦有N個訊息還沒有ack,則該consumer將block掉,直到有訊息ack。
這樣做的好處是,如果系統處於高峰期,消費者來不及處理,訊息會堆積在佇列中,新啟動的消費者可以馬上從佇列中取到訊息開始工作。

圖示:
這裡寫圖片描述

上圖描述了這樣一個過程:剛啟動時消費者C1、C2、C3均為空閒狀態,且它們的channel都設定了prefetch=1,佇列中有訊息1~N。下面按照各消費者收到訊息的時間順序說明整個過程。

C1收到訊息1開始處理。
C2收到訊息2開始處理。
C3收到訊息3開始處理。
C2處理完訊息2併產生應答,可以看到圖中訊息2從佇列中移除了。
C2收到訊息4開始處理,可以看到圖中訊息4被標記為已傳送狀態。
C3處理完訊息3併產生應答,可以看到圖中訊息3從佇列中移除了。
C3收到訊息5開始處理,可以看到訊息5被標記為已傳送狀態。
上圖反應的就是直到C3收到訊息5並處理時的整個佇列中所有訊息的狀態。