1. 程式人生 > 實用技巧 >rabbitmq ACK訊息確認機制

rabbitmq ACK訊息確認機制

程式碼示例

生產者 p.php

<?php
// 生產者 p.php
//配置資訊
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_1';
$queueName    = 'q_1';
$routeKey     = 'order';
 
//建立連線和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟
$exchange->setFlags(AMQP_DURABLE);

// 宣告交換機
$exchange->declareExchange();

// 建立訊息佇列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 設定永續性
$queue->setFlags(AMQP_DURABLE);

// 宣告訊息佇列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 向伺服器佇列推送10條訊息
for ($i = 0; $i < 10; $i++) {
    $msg = 'hello world ' . $i;
    $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
}

  

消費者 c.php

<?php
// 消費者 c.php
//配置資訊
$config = [
    'host'     => 'localhost',
    'port'     => '5672',
    'login'    => 'guest',
    'password' => 'guest',
    'vhost'    => '/'
];

$exchangeName = 'e_1';
$queueName    = 'q_1';
$routeKey     = 'order';
 
//建立連線和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁碟,宕機資料消失 2:持久化到磁碟
$exchange->setFlags(AMQP_DURABLE);

// 宣告交換機
$exchange->declareExchange();

// 建立訊息佇列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);

// 設定永續性
$queue->setFlags(AMQP_DURABLE);

// 宣告訊息佇列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收訊息並處理回撥
$queue->consume('receive');

// 處理回撥的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

學習ack機制前,先了解ack機制使用和不使用兩種情況下區別,這裡我們在生產者中向伺服器迴圈推送10條訊息

消費者使用ack機制

// 接收訊息並處理回撥
$queue->consume('receive');

// 處理回撥的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // 通知生產者任務完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

不使用ack機制

通過對比兩種執行結果,我們可以得知,伺服器會連續推送3個訊息,沒有收到消費者返回時,停止向該消費者推送訊息

在RabbitMQ中有一個prefetch_count的概念,這個引數的意思是允許Consumer最多同時處理幾個任務。我的版本的RabbitMQ預設這個引數是3,也就是說如果某一個Consumer在收到訊息後沒有傳送ACK確認包,RabbitMQ就會任務Consumer還在處理任務,當有3個訊息都沒有傳送ACK確認包時,RabbitMQ就不會再發送訊息給該Consumer。

在控制檯確實3條訊息未確認,7條準備傳送

如果Consumer數量很多或者希望每個Consumer同時只處理一個任務可以通過在Consumer中設定PrefetchCount來實現更加均勻的任務分發。

$channel = new AMQPChannel($connection);
$channel->setPrefetchCount(1);

以上情況是隻一個消費者,如果我們有兩個相同的消費者訂閱相同的佇列,那麼伺服器又是如何將訊息推送給兩個消費者的呢?

複製出 c1.php、 c2.php兩個消費者

c1.php 和 c2.php 都開啟ACK訊息確認機制,

控制檯 顯示有兩個連線

執行結果

可以看出伺服器是平均將任務發給兩個生產者

第二種情況,假如c2.php出故障或者程式碼錯誤導致ACk機制失效。

下面關掉c2.php指令碼,這樣c2.php和伺服器斷開連線

可以得知 c2.php 沒有返回訊息確認時,這三個訊息是Unacked狀態,其他7個訊息由c1.php成功執行時,當c2.php斷開連線時,伺服器將Unacked訊息交由c1.php完成。