PHP操作RabbitMQ的類 exchange、queue、route kye、bind
RabbitMQ是常見的消息中間件。也許是還是不夠了解的緣故,感覺功能還好吧。
講到隊列,大家腦子裏第一印象是下邊這樣的。
P生產者推送消息-->隊列-->C消費者取出消息
結構很簡單,但是RabbitMQ應該是為了豐富的功能吧,把“隊列”拆分了。
分成了:exchange(交換機)和queue(隊列)兩個部分
同時說明:
生產者推送消息只推到exchange,不知道會進入哪個queue。
exchange通過一個route key與queue綁定,這時才會知道消息具體落到了哪個queue裏。
而消費則獲取消息,是直接從隊列裏取的。
大概就是以上這個意思,問題是還有兩個特殊的說明:
1.消費者是無法訂閱或者獲取不存在的MessageQueue中信息。
2.消息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。
簡單的理解就是,如果exchange和queue不綁定,生產者推送的消息到exchange會直接丟棄(丟失),同時consume也無法完成訂閱。
所以,這裏就有一個問題,無論是推送消息進隊列,還是訂閱消息消費,必須先定義好exchange和queue並通過route key綁定一起。
那麽到底是推送消息的時候定義並綁定呢?還是訂閱的時候定義並綁定呢?
根據那兩個特殊的說明理解,無論是誰定義綁定,都有可能會出現問題。
所以,最終就是推送和消費之前,都嘗試定義exchange和queue,並完成綁定。
推送消息,相對簡單,就一個publish指定exchange和route key就完成了。
消費消息,相對的復雜一點,有兩種方式:
1、推送(push)訂閱方式,使用consume方法訂閱隊列,只要隊列有消息就消費
2、拉取(poll)主動拉取,使用get方法,主動去從隊列一次拉取一條消息
這兩種情況,都有各自的應用場景,可以根據需要自行選擇。
額外提醒一點:盡量不要使用循環的方式調用get方法消費隊列,尤其是處理的消息很多的情況。
如果大量的消費隊列,建議直接使用consume方法。
還有一個情況,當消費者取出消息時,可以不對消息隊列做任何操作,也可以將取出的消息刪除。
畢竟,隊列裏的消息,消費後是需要刪除的,取到消息,發給隊列一個然虧,隊列就刪除該消息。
這個取出消息後的刪除,也分兩種情況:
1、一種是取出就刪除,consume和get兩個方法都有一個參數AMQP_AUTOACK自動反饋
2、另一種是取出後,並不會自動返回刪除,而是將取出來的消息處理之後確認沒有問題了,手動反饋給消息隊列
至於選擇哪種反饋方式,根據需求自行選擇。
關於RabbitMQ還有很多要說的,比如一個exchange可以綁定多個queue,多個exchange可以綁定一個queue(多對多的關系)
還可以根據exchange不同的模式,搭配不同的route key做不同的匹配。各種組合吧。
靈活應用起來功能還是很強大的,只是具體使用時需要仔細,因為一個不小心,不是丟失消息就是多出很多消息。
所知有限,在此不做特復雜的說明,下邊的例子也是一個簡單的完成一個簡單的隊列的操作。演示學習用。
更多的情況,就自行研究擴展。
RabbitMQ操作類(rabbitmq.class.php)
1 <?php 2 // rabbitmq 操作類 3 class RabbitMQ 4 { 5 // 配置變量 6 public $configs = array( 7 ‘host‘ => ‘localhost‘, 8 ‘port‘ => ‘5672‘, 9 ‘login‘ => ‘guest‘, 10 ‘password‘ => ‘guest‘, 11 ‘vhost‘ => ‘/‘ 12 ); 13 public $exchange_name = ‘ex_q_def‘;// 交換機名稱 14 public $queue_name = ‘ex_q_def‘;// 隊列名稱 15 public $route_key = ‘‘;// 路由key的名稱 16 public $durable = true;// 持久化,默認true 17 public $autodelete = false;// 自動刪除 18 19 // 內部通用變量 20 private $_conn = null; 21 private $_exchange = null; 22 private $_channel = null; 23 private $_queue = null; 24 25 // 構造函數 26 public function __construct() 27 { 28 // 初始化隊列 29 $this->init(); 30 } 31 32 // 配置rabbitmq 33 public function set_configs($configs) 34 { 35 // 初始化配置 36 if (!is_array($configs)) { 37 echo ‘configs is not array.‘; 38 } 39 if (!($configs[‘host‘] && $configs[‘port‘] && $configs[‘login‘] && $configs[‘password‘])) { 40 echo ‘configs is empty.‘; 41 } 42 if (!isset($configs[‘vhost‘])) {// 沒有vhost元素,給出默認值 43 $configs[‘vhost‘] = ‘/‘; 44 } else { 45 if (empty($configs[‘vhost‘])) {// 有vhost元素,但是值為空,給出默認值 46 $configs[‘vhost‘] = ‘/‘; 47 } 48 } 49 $this->configs = $configs; 50 } 51 52 // 初始化rabbitmq 53 private function init() 54 { 55 if (!$this->_conn) { 56 $this->_conn = new AMQPConnection($this->configs);// 創建連接對象 57 if (!$this->_conn->connect()) { 58 echo "Cannot connect to the broker \n "; 59 exit(0); 60 } 61 } 62 63 // 創建channel 64 $this->_channel = new AMQPChannel($this->_conn); 65 } 66 67 // 創建隊列(為了保證正常訂閱,避免消息丟失,生產者和消費則都要嘗試創建隊列:交換機和隊列通過路由綁定一起) 68 public function create_queue($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘) 69 { 70 if ($exchange_name != ‘‘) { 71 // 隊列名參數可以省略,默認與交換機同名 72 $this->exchange_name = $exchange_name;// 更新交換機名稱 73 $this->queue_name = $exchange_name;// 更新隊列名稱 74 } 75 if ($route_key != ‘‘) $this->route_key = $route_key;// 更新路由 76 if ($queue_name != ‘‘) $this->queue_name = $queue_name;// 獨立更新隊列名稱 77 78 // 創建exchange交換機 79 $this->_exchange = new AMQPExchange($this->_channel);// 創建交換機 80 $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);// 設置交換機模式為direct 81 if ($this->durable) { 82 $this->_exchange->setFlags(AMQP_DURABLE);// 設置是否持久化 83 } 84 if ($this->autodelete) { 85 $this->_exchange->setFlags(AMQP_AUTODELETE);// 設置是否自動刪除 86 } 87 $this->_exchange->setName($this->exchange_name);// 設置交換機名稱 88 $this->_exchange->declare(); 89 90 // 創建queue隊列 91 $this->_queue = new AMQPQueue($this->_channel); 92 if ($this->durable) { 93 $this->_queue->setFlags(AMQP_DURABLE);// 設置是否持久化 94 } 95 if ($this->autodelete) { 96 $this->_queue->setFlags(AMQP_AUTODELETE);// 設置是否自動刪除 97 } 98 $this->_queue->setName($this->queue_name);// 設置隊列名稱 99 $this->_queue->declare();// 完成隊列的定義 100 101 // 將queue和exchange通過route_key綁定在一起 102 $this->_queue->bind($this->exchange_name, $this->route_key); 103 } 104 105 // 生產者,向隊列交換機發送消息 106 public function send($msg, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘) 107 { 108 $this->create_queue($exchange_name, $route_key, $queue_name); 109 // 消息處理 110 if (is_array($msg)) { 111 $msg = json_encode($msg);// 將數組類型轉換成JSON格式 112 } else { 113 $msg = trim(strval($msg));// 簡單處理一下要發送的消息內容 114 } 115 116 // 生產者推送消息進隊列時,只能將消息推送到交換機exchange中 117 $this->_exchange->publish($msg, $this->route_key); 118 } 119 120 // 消費者,從隊列中獲取數,消費隊列(訂閱) 121 public function run($fun_name, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false) 122 { 123 if (!$fun_name) {// 沒有返回函數,或者隊列不存在 124 return false; 125 } 126 $this->create_queue($exchange_name, $route_key, $queue_name); 127 // 訂閱消息 128 while (true) { 129 if ($autoack) { 130 $this->_queue->consume($fun_name, AMQP_AUTOACK);// 自動應答 131 } else { 132 $this->_queue->consume($fun_name);// 需要手動應答 133 } 134 } 135 } 136 137 // 消費者,從隊列中獲取數,消費隊列(主動獲取) 138 public function get($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false) 139 { 140 $this->create_queue($exchange_name, $route_key, $queue_name); 141 // 主動獲取消息 142 if ($autoack) { 143 $msg = $this->_queue->get(AMQP_AUTOACK);// 自動應答 144 } else { 145 $msg = $this->_queue->get();// 需要手動應答 146 } 147 return [‘msg‘=>$msg, ‘queue‘=>$this->_queue]; 148 } 149 } 150 ?>
生產者推送(test_send.php)
1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 4 $rmq = new RabbitMQ; 5 for ($i = 0; $i < 10; $i++) { 6 echo ‘test_consume_‘ . $i .‘<br />‘; 7 $rmq->send(‘test_consume_‘ . $i, ‘test_consume‘); 8 } 9 10 for ($i = 0; $i < 10; $i++) { 11 echo ‘get_msg_‘.$i.‘<br />‘; 12 $rmq->send(‘get_msg_‘ . $i, ‘test_get‘); 13 } 14 15 echo ‘send ok ! ‘ . date(‘Y-m-d H:i:s‘); 16 ?>
消費者consume(test_run.php)
1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 4 $rmq = new RabbitMQ; 5 6 $s = $rmq->run(‘processMessage‘, ‘test_consume‘); 7 8 function processMessage($envelope, $queue) { 9 $msg = $envelope->getBody(); 10 sleep(1); //sleep1秒模擬任務處理 11 echo $msg."\n"; //處理消息 12 $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答 13 } 14 ?>
消費者get(test_get.php)
1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 4 $rmq = new RabbitMQ; 5 6 $r = $rmq->get(‘test_get‘); 7 8 echo $r[‘msg‘]->getBody();// 取到的消息 9 $r[‘queue‘]->ack($r[‘msg‘]->getDeliveryTag());// 手動反饋,刪除消費的消息 10 ?>
訂閱consume可以起多個程序,隊列會輪詢平均的分到每一個訂閱裏。當然,前提是處理速度是一樣,並且都有反饋。
如果處理速度不同,哪個快 ,哪就會分配更多的消息。如果沒有反饋,默認只會推送3條消息,如果一直不給反饋,就不會再有推送了。
此時如果中斷這個沒有反饋的訂閱,因為隊列中沒有刪除,會再次分配到其他訂閱者哪裏繼續推送消費。
同樣的,如果隊列中有消息,隨時開啟新的訂閱,隨時就會分配到消費的消息。
PHP操作RabbitMQ的類 exchange、queue、route kye、bind