1. 程式人生 > 其它 >RabbitMQ(二) ——工作佇列

RabbitMQ(二) ——工作佇列

RabbitMQ(二)——工作佇列

(原創內容,轉載請註明來源,謝謝)

一、概述

工作佇列模式(work queue),是有多個消費者的情況下,可以共同消費佇列內的內容,加快訊息處理速度。這是RabbitMQ的基本工作模式。

二、使用方式

和上一篇中的生產和消費訊息的方式一樣,就是需要多在cli程序中開啟一個消費者的php檔案。即需要開啟3個php,一個是生產者的php檔案,兩個消費者的php檔案(或多個php檔案)。

三、工作機制

3.1 輪詢(Round-robin dispatching)

當開啟多個生產者的時候,消費者產生訊息併發送到佇列的情況下,佇列會將訊息均衡的分發給同時開啟的多個消費者。即採用輪詢的方式,假設有兩個消費者c1和c2,第一次有訊息給c1,第二次有訊息給c2,第三個再給c1,以此類推。

3.2 回饋機制(Message acknowledgment)

為了保證訊息的可靠性,RabbitMQ允許使用者採用消費者的ack機制,即只有消費者回饋給佇列ack後,佇列才會將訊息從佇列中剔除。這樣,可以確保佇列中的每個訊息都是確認被消費者處理完畢的。

開啟的方式,只要將方法basic_consume第四個引數設定成false,就表示開啟ack機制。

開啟ack,就必須要記得在消費者的程式碼總,加入回饋的程式碼,否則,訊息會被佇列認為沒有消費,不斷的堵在佇列中,導致佇列堵塞。

要檢視佇列中還沒確認的內容,可以採用RabbitMQ的管理工具——rabbitmqctl。

       sudo rabbitmqctl list_queues namemessages_ready messages_unacknowledged

3.3 訊息持久化(Message durability)

RabbitMQ具有完善的持久化機制,能夠確保訊息的安全性。可以在程式碼中開啟持久化。訊息持久化需要在佇列和訊息分別開啟持久化。

1)佇列持久化:

queue_declare的第三個引數設定為true。

2)訊息持久化:

$msg = new AMQPMessage($data,
       array('delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT)
    );

這樣可以確保訊息可以儲存在本地磁碟,因此即使rabbitmq的伺服器宕機的時候,也可以保證訊息的安全性。

當然,這也也存在一定的風險,因為作業系統自身的機制,開啟持久化的時候,作業系統為了保證運作速度,訊息會儲存在作業系統層面的快取,並且定時將訊息存入硬碟。因此,還沒存入磁碟的這一小段間隔(30秒左右),如果伺服器宕機,有可能訊息丟失。但是這個可能性很低,因此安全性已經很高。

3.4 公平分發(Fair dispatch)與預取機制(prefetch)

訊息的輪詢機制,每次有訊息,佇列則直接按照排好的順序將訊息傳給對應的佇列,有可能出現一部分佇列上一條訊息還沒處理完,就出現了下一條的訊息,而另外一部分佇列則空閒的情況。

因此,RabbitMQ允許使用者開啟公平分發機制,讓每個消費者只能接收一個訊息,如果還沒給佇列回覆ack,則訊息來的時候,即使順序輪到佇列,也不會分發給它,而是分發給它下一個空佇列。

開啟方式很簡單,在消費者的channel,在消費訊息之前(使用basic_consume方法),給channel加一個qos機制:

       $channel->basic_qos(null, 1, null);

第二個引數就是要求消費者每次只能處理幾個訊息。

這樣也存在一個隱患,即如果所有的消費者都還沒ack,而生產者又不斷的往佇列發資料,則佇列有可能會塞滿。這個處理方案只有增加消費者,或者從程式碼、邏輯層面控制訊息的產生速度。

——written by linhxx 2017.08.19