php rabbitmq的開發體驗(三)
一、前言
在上一篇rabbitmq開發體驗(二),我們正式的用我們php來操作訊息佇列的生產和消費,並利用的rabbitmq的高階特性來進行ack確認機制,冪等性,限流機制,重回機制,ttl,死信佇列(相當於失敗訊息的回收站)。已經可以正常的使用,但訊息消費異常問題羅列以下。
1、自動ack機制會導致訊息丟失的問題;
簡要程式碼如下,設定訊息自動ack,這種情況下,MQ只要確認訊息傳送成功,無須等待應答就會丟棄訊息,
這會導致客戶端還未處理完時,出異常或斷電了,導致訊息丟失的後果。解決方法就是把程式碼裡的true,改成false,並在訊息處理完後發ack響應。
注:自動ack還有個弊端,只要佇列不空,RabbitMQ會源源不斷的把訊息推送給客戶端,而不管客戶端能否消費的完。
$this->channel->basic_consume( $this->query_name, '', //customer_tag false, //no_local true, //no_ack 訊息自動ack false, //exclusive 排他消費者,即這個佇列只能由一個消費者消費.適用於任務不允許進行併發處理的情況下.比如系統對接 false, //nowait
2、自動ack機制會導致訊息丟失的問題;
為了解決問題1,做了改進,簡要程式碼如下:
$this->channel->basic_consume($this->query_name, '', //customer_tag false, //no_local false, //no_ack 關閉自動ack,手工傳送ack false, //exclusive 排他消費者,即這個佇列只能由一個消費者消費.適用於任務不允許進行併發處理的情況下.比如系統對接 false, //nowait
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //手動在成功消費後傳送ack
先處理訊息,完成後,再做ack響應,失敗就不做ack響應,這樣訊息會儲存在MQ的Unacked訊息裡,不會丟失,看起來沒啥問題,
但是有一次,callback觸發了一個bug,導致所有訊息都丟擲異常,然後佇列的Unacked訊息數暴漲,導致MQ響應越來越慢,甚至崩潰的問題。
原因是如果MQ沒得到ack響應,這些訊息會堆積在Unacked訊息裡,不會拋棄,直至客戶端斷開重連時,才變回ready;
如果Consumer客戶端不斷開連線,這些Unacked訊息,永遠不會變回ready狀態,Unacked訊息多了,佔用記憶體越來越大,就會異常了。
解決辦法就是及時去ack訊息了。
3、啟用nack機制後,導致的死迴圈;
為了解決問題2,再調整一下程式碼,簡要程式碼如下:
catch (Exception $e){ $this->writeLog('runtime/vm_exception.log',$e->getMessage()); //傳送nack資訊應答當前訊息處理異常 第三個引數是否重回佇列 預設false不重回佇列 $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true); }
嗯,改成這模樣總沒問題了吧,正常就ack,不正常就nack,並等下一次重新消費。
果然,又出問題了,這回又是callback出異常了,但是故障現象是Ready的訊息猛增,一直不見減少。
原因是出異常後,把訊息塞回佇列頭部,下一步又消費這條會出異常的訊息,又出錯,塞回佇列……
進入了死迴圈了,當然新的訊息不會消費,導致堆積了……
我的解決方案:
$retry = $this->getRetryCount($msg); try { $routingKey = $this->getOrigRoutingKey($msg); $subMessage = new SubMessage($msg, $routingKey , [ 'retry_count' => $retry, // 重試次數 ]); $this->subscribe($subMessage); } catch (\Exception $ex) { $this->writeLog('runtime/vm_consume_failed.log', '消費失敗!' . $ex->getMessage() . $msg->getBody()); if ($retry > 3) { // 超過最大重試次數,訊息無法處理 $publishFailed($msg); return; } // 訊息處理失敗,稍後重試 $publishRetry($msg); }
/** * 獲取訊息重試次數 * @param AMQPMessage $msg * @return int */ protected function getRetryCount($msg) { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; }
// 發起延時重試 $publishRetry = function ($msg) use ($queueName,$exchangeRetryName) { /** @var AMQPTable $headers */ if ($msg->has('application_headers')) { $headers = $msg->get('application_headers'); } else { $headers = new AMQPTable(); } $headers->set('x-orig-routing-key', $this->getOrigRoutingKey($msg)); $properties = $msg->get_properties(); $properties['application_headers'] = $headers; $newMsg = new AMQPMessage($msg->getBody(), $properties); $this->channel->basic_publish( $newMsg, $exchangeRetryName, $queueName ); //傳送ack資訊應答當前訊息處理完成 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); };
/** * 宣告重試佇列 */ private function declareRetryQueue() { $this->channel->queue_declare($this->query_retry_name, false, true, false, false, false,new AMQPTable(array( 'x-dead-letter-exchange' => $this->exchange_name, 'x-dead-letter-routing-key' => $this->query_name, 'x-message-ttl' => 3 * 1000, ))); $this->channel->queue_bind($this->query_retry_name, $this->exchange_retry_name, $this->query_name); }