RabbitMQ實現生產者和消費者(帶註釋)
阿新 • • 發佈:2018-12-14
1、生產者:rabbitmq_publisher.php
<?php date_default_timezone_set("Asia/Shanghai"); //配置資訊 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => '******', 'password' => '******', 'vhost'=>'/' ); $item_ex_name = 'queen_item'; //單個店鋪交換機名 $all_ex_name = 'queen_all'; //所有店鋪持久化交換機名 //建立連線和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $item_channel = new AMQPChannel($conn);//單個店鋪通道 //建立交換機物件 $item_ex = new AMQPExchange($item_channel); $item_ex->setName($item_ex_name); //傳送訊息 //$channel->startTransaction(); //開始事務 for($i=0; $i<5; $i++){ sleep(1);//休眠1秒 //訊息內容 $item_message = "TEST MESSAGE(item):".date("Y-m-d h:i:s",time()); echo date("Y-m-d h:i:s",time()) . "Send Message(item):" . $item_ex->publish($item_message, $item_route)."\n"; } //$channel->commitTransaction(); //提交事務 $conn->disconnect(); ?>
2、消費者:rabbitmq_consumer.php
<?php error_reporting(E_ERROR | E_WARNING | E_PARSE | E_NOTICE); //配置資訊 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => '******', 'password' => '******', 'vhost'=>'/' ); $item_ex_name = 'queen_item'; //交換機名 $item_queen_name = 'queen_shop_1'; //佇列名 $item_route = 'shop_1'; //單個店鋪隊里路由key(格式:shop_id) //建立連線和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $item_channel = new AMQPChannel($conn); //建立交換機 $item_ex = new AMQPExchange($item_channel); $item_ex->setName($item_ex_name); $item_ex->setType(AMQP_EX_TYPE_DIRECT); //direct型別 $item_ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status(item):".$item_ex->declare()."\n"; //在某個交換機下建立佇列 $item_queen = new AMQPQueue($item_channel); $item_queen->setName($item_queen_name); $item_queen->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$item_queen->declare()."\n"; //通過路由繫結交換機與佇列 echo 'Queue Bind(item): '.$item_queen->bind($item_ex_name, $item_route)."\n"; //阻塞模式接收訊息 echo "Message:\n"; while(True){ $item_queen->consume('itemMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } $conn->disconnect(); /** * 消費回撥函式 * 處理訊息 */ function itemMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //處理訊息 $queue->ack($envelope->getDeliveryTag()); //處理成功後手動傳送ACK應答,若不傳送則可以只讀不消費。 } ?>