1. 程式人生 > >php amqp訊息佇列教程1-程式碼實現例項

php amqp訊息佇列教程1-程式碼實現例項

兩年前曾發過一篇《用Python嘗試RabbitMQ》,沒想到兩年後的今天,基於PHP的amqp資料還是少得可憐,原來的幾個擴充套件也都一個個被廢棄,只剩amqp還健在,且被PECL收錄。雖說被收錄,可官方手冊中的資訊還是略顯單薄。
言歸正傳,amqp擴充套件的安裝就不多說了,可參看前幾天發的那篇《給PHP安裝amqp擴充套件
裝好了amqp後就可以開始編寫程式碼了:

消費者:接收訊息

邏輯:
建立連線-->建立channel-->建立交換機-->建立佇列-->繫結交換機/佇列/路由鍵-->接收訊息
  1. <?php    
  2. /*************************************
     
  3.  * PHP amqp(RabbitMQ) Demo - consumer 
  4.  * Author: Linvo 
  5.  * Date: 2012/7/30 
  6.  *************************************/
  7. //配置資訊
  8. $conn_args = array(  
  9.     'host' => '192.168.1.93',   
  10.     'port' => '5672',   
  11.     'login' => 'guest',   
  12.     'password' => 'guest',  
  13.     'vhost'=>'/'
  14. );    
  15. $e_name
     = 'e_linvo'//交換機名
  16. $q_name = 'q_linvo'//佇列名
  17. $k_route = 'key_1'//路由key
  18. //建立連線和channel
  19. $conn = new AMQPConnection($conn_args);    
  20. if (!$conn->connect()) {    
  21.     die("Cannot connect to the broker!\n");    
  22. }    
  23. $channel = new AMQPChannel($conn);    
  24. //建立交換機   
  25. $ex = new AMQPExchange($channel);    
  26. $ex
    ->setName($e_name);  
  27. $ex->setType(AMQP_EX_TYPE_DIRECT); //direct型別 
  28. $ex->setFlags(AMQP_DURABLE); //持久化
  29. echo"Exchange Status:".$ex->declare()."\n";    
  30. //建立佇列   
  31. $q = new AMQPQueue($channel);  
  32. $q->setName($q_name);    
  33. $q->setFlags(AMQP_DURABLE); //持久化 
  34. echo"Message Total:".$q->declare()."\n";    
  35. //繫結交換機與佇列,並指定路由鍵
  36. echo'Queue Bind: '.$q->bind($e_name$k_route)."\n";  
  37. //阻塞模式接收訊息
  38. echo"Message:\n";    
  39. while(True){  
  40.     $q->consume('processMessage');    
  41.     //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 
  42. }  
  43. $conn->disconnect();    
  44. /** 
  45.  * 消費回撥函式 
  46.  * 處理訊息 
  47.  */
  48. function processMessage($envelope$queue) {  
  49.     $msg = $envelope->getBody();  
  50.     echo$msg."\n"//處理訊息
  51.     $queue->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
  52. }  

生產者:傳送訊息 邏輯:
建立連線-->建立channel-->建立交換機物件-->傳送訊息
  1. <?php    
  2. /************************************* 
  3.  * PHP amqp(RabbitMQ) Demo - publisher 
  4.  * Author: Linvo 
  5.  * Date: 2012/7/30 
  6.  *************************************/
  7. //配置資訊
  8. $conn_args = array(  
  9.     'host' => '192.168.1.93',   
  10.     'port' => '5672',   
  11.     'login' => 'guest',   
  12.     'password' => 'guest',  
  13.     'vhost'=>'/'
  14. );    
  15. $e_name = 'e_linvo'//交換機名
  16. //$q_name = 'q_linvo'; //無需佇列名
  17. $k_route = 'key_1'//路由key
  18. //建立連線和channel
  19. $conn = new AMQPConnection($conn_args);    
  20. if (!$conn->connect()) {    
  21.     die("Cannot connect to the broker!\n");    
  22. }    
  23. $channel = new AMQPChannel($conn);    
  24. //訊息內容
  25. $message = "TEST MESSAGE! 測試訊息!";    
  26. //建立交換機物件   
  27. $ex = new AMQPExchange($channel);    
  28. $ex->setName($e_name);    
  29. //傳送訊息
  30. //$channel->startTransaction(); //開始事務 
  31. for($i=0; $i<5; ++$i){  
  32.     echo"Send Message:".$ex->publish($message$k_route)."\n";   
  33. }  
  34. //$channel->commitTransaction(); //提交事務
  35. $conn->disconnect();    

需要注意的地方是:

queue物件有兩個方法可用於取訊息:consume和get。
前者是阻塞的,無訊息時會被掛起,適合迴圈中使用;

後者則是非阻塞的,取訊息時有則取,無則返回false。

測試截圖

執行消費者:


執行生產者,發訊息:


消費者接收到訊息: