(十一)RabbitMQ訊息佇列-如何實現高可用
在前面講到了RabbitMQ高可用叢集的搭建,但是我們知道只是叢集的高可用並不能保證應用在使用訊息佇列時完全沒有問題,例如如果應用連線的RabbitMQ叢集突然宕機了,雖然這個叢集時可以使用的,但是應用訂閱的連線就斷開了,如果有個機房外網出口頻寬被挖掘機弄斷了,那叢集依然是不可用的。所以我們後面會介紹應用APP如何與連線叢集來保證兩者配合默契,以及如何實現跨機房的叢集複製。
應用連線叢集高可用
前面講到應用伺服器通過一個負載均衡服務將連線的流量分發到指定伺服器,如果連線的節點宕機怎麼辦呢。應用伺服器連線叢集主要做兩件事,訂閱和釋出,所以如果是釋出訊息每次都會重新初始化連線所以連線節點的切換對整個系統的可用性影響不大。如果是訂閱訊息就沒有真麼簡單了。首先我們要做到如果連接出現問題應該是丟擲異常而不是終止指令碼,並且這時應該重新連線連線。
好了不廢話了,程式碼如下:
$queueName = 'superrd';
$exchangeName = 'superrd';
while(True){
try {
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange ->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue ->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
//阻塞模式接收訊息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自動ACK應答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
} catch (AMQPConnectionException $e) {
var_dump($e);
// sleep(1);
}
//$conn->disconnect();
}
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
echo $msg."\n"; //處理訊息
$q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}
所以通過以上的程式碼就可以保證伺服器某節點宕機後訂閱的連線自動重連切換。
RabbitMQ叢集異地複製
基於warren的共享儲存模式
這種方式其實並不是跨地區的遠端複製,並且需要共享儲存,如果感興趣的同學可以百度下。
基於Shovel的遠端複製
如果直接基於WAN來組建異地的叢集的話,叢集間大量的資料通訊會產生高昂的費用,另外Erlang也不允許這麼高延遲的通訊。
Shovel是RabbitMQ自帶外掛(2.7.0後),自帶外掛的好處就是可以在RabbitMQ服務啟動時自動啟動Shovel和自定義複製關係。
Shovel執行的原理其實非常簡單。通過定義RabbitMQ上一個佇列和另外一個RabbitMQ上的交換機之間的複製關係來實現遠端複製。也就是說它會在主服務上建立一個佇列來監聽交換機,所以這是到交換機所以的訊息會投遞到該佇列,並且在從服務中訂閱這個佇列,使佇列中的訊息複製到從服務的交換機中。RabbitMQ是一個比較全面的訊息佇列解決方案,我們公司並沒有用到該功能,只是在這提下,感興趣的同學可以搜下。
RabbitMQ技術交流QQ群:327034977(新增時請備註RabbitMQ)