1. 程式人生 > 其它 >php rabbitmq的開發體驗(三)

php rabbitmq的開發體驗(三)

一、前言

在上一篇rabbitmq開發體驗(二),我們正式的用我們php來操作訊息佇列的生產和消費,並利用的rabbitmq的高階特性來進行ack確認機制,冪等性,限流機制,重回機制,ttl,死信佇列(相當於失敗訊息的回收站)。已經可以正常的使用,但訊息消費異常問題羅列以下。

1、自動ack機制會導致訊息丟失的問題;

簡要程式碼如下,設定訊息自動ack,這種情況下,MQ只要確認訊息傳送成功,無須等待應答就會丟棄訊息,
這會導致客戶端還未處理完時,出異常或斷電了,導致訊息丟失的後果。解決方法就是把程式碼裡的true,改成false,並在訊息處理完後發ack響應。
注:自動ack還有個弊端,只要佇列不空,RabbitMQ會源源不斷的把訊息推送給客戶端,而不管客戶端能否消費的完。

$this->channel->basic_consume(
    $this->query_name,
    '',     //customer_tag
    false,  //no_local
    true,  //no_ack 訊息自動ack
    false,   //exclusive 排他消費者,即這個佇列只能由一個消費者消費.適用於任務不允許進行併發處理的情況下.比如系統對接
    false,  //nowait

2、自動ack機制會導致訊息丟失的問題;

為了解決問題1,做了改進,簡要程式碼如下:



$this->channel->basic_consume(
    
$this->query_name, '', //customer_tag false, //no_local false, //no_ack 關閉自動ack,手工傳送ack false, //exclusive 排他消費者,即這個佇列只能由一個消費者消費.適用於任務不允許進行併發處理的情況下.比如系統對接 false, //nowait
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //手動在成功消費後傳送ack

先處理訊息,完成後,再做ack響應,失敗就不做ack響應,這樣訊息會儲存在MQ的Unacked訊息裡,不會丟失,看起來沒啥問題,
但是有一次,callback觸發了一個bug,導致所有訊息都丟擲異常,然後佇列的Unacked訊息數暴漲,導致MQ響應越來越慢,甚至崩潰的問題。
原因是如果MQ沒得到ack響應,這些訊息會堆積在Unacked訊息裡,不會拋棄,直至客戶端斷開重連時,才變回ready;
如果Consumer客戶端不斷開連線,這些Unacked訊息,永遠不會變回ready狀態,Unacked訊息多了,佔用記憶體越來越大,就會異常了。
解決辦法就是及時去ack訊息了。

3、啟用nack機制後,導致的死迴圈;

為了解決問題2,再調整一下程式碼,簡要程式碼如下:

catch (Exception $e){
        $this->writeLog('runtime/vm_exception.log',$e->getMessage());
        //傳送nack資訊應答當前訊息處理異常 第三個引數是否重回佇列 預設false不重回佇列
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}

嗯,改成這模樣總沒問題了吧,正常就ack,不正常就nack,並等下一次重新消費。
果然,又出問題了,這回又是callback出異常了,但是故障現象是Ready的訊息猛增,一直不見減少。
原因是出異常後,把訊息塞回佇列頭部,下一步又消費這條會出異常的訊息,又出錯,塞回佇列……
進入了死迴圈了,當然新的訊息不會消費,導致堆積了……

我的解決方案:

$retry = $this->getRetryCount($msg);

try {
        $routingKey = $this->getOrigRoutingKey($msg);
        $subMessage = new SubMessage($msg, $routingKey , [
              'retry_count' => $retry, // 重試次數
        ]);

        $this->subscribe($subMessage);

} catch (\Exception $ex) {
                
        $this->writeLog('runtime/vm_consume_failed.log', '消費失敗!' . $ex->getMessage() . $msg->getBody());
        if ($retry > 3) {
               // 超過最大重試次數,訊息無法處理
               $publishFailed($msg);
               return;
        }

        // 訊息處理失敗,稍後重試
        $publishRetry($msg);
}
    /**
     * 獲取訊息重試次數
     * @param AMQPMessage $msg
     * @return int
     */
    protected function getRetryCount($msg)
    {
        $retry = 0;
        if ($msg->has('application_headers')) {
            $headers = $msg->get('application_headers')->getNativeData();
            if (isset($headers['x-death'][0]['count'])) {
                $retry = $headers['x-death'][0]['count'];
            }
        }

        return (int)$retry;
    }
        // 發起延時重試
        $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) {

            /** @var AMQPTable $headers */
            if ($msg->has('application_headers')) {
                $headers = $msg->get('application_headers');
            } else {
                $headers = new AMQPTable();
            }

            $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg));

            $properties = $msg->get_properties();
            $properties['application_headers'] = $headers;
            $newMsg = new AMQPMessage($msg->getBody(), $properties);

            $this->channel->basic_publish(
                $newMsg,
                $exchangeRetryName,
                $queueName
            );
            //傳送ack資訊應答當前訊息處理完成
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
    /**
     * 宣告重試佇列
     */
    private function declareRetryQueue()
    {
        $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array(
            'x-dead-letter-exchange' => $this->exchange_name,
            'x-dead-letter-routing-key' => $this->query_name,
            'x-message-ttl'          => 3 * 1000,
        )));
        $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name);
    }