mq類----2
阿新 • • 發佈:2017-12-07
unset nts eid exception 生產者 pconnect lease fin log
手動應答方式 使用get
my_consumer.php 消費者 生產者和上一篇 一樣
<?php /** * Created by PhpStorm. * User: brady * Date: 2017/12/6 * Time: 19:41 */ require_once "MQ.php"; set_time_limit(0); $configs = array( ‘host‘=>‘192.168.33.30‘, ‘port‘=>5672, ‘username‘=>‘title‘, ‘password‘=>‘title‘, ‘vhost‘=>‘/‘ ); $number = 2; $config = [ ‘exchange_name‘=>‘brady‘, ‘queue_name‘=>"queue_".$number, ‘route_key‘=>"route_".$number ]; $mq = new MQ($configs,$config[‘exchange_name‘],$config[‘queue_name‘],$config[‘route_key‘]); //用類的方式 /*class A{ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); $envelopeID = $envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.‘|‘.$envelopeID.‘‘."\r\n",FILE_APPEND); $queue->ack($envelopeID); } } $a = new A();*/ class B{ protected $_envelope; protected $_queue; public function __construct($envelope,$queue) { $this->_queue = $queue; $this->_envelope = $envelope; } public function test() { $msg = $this->_envelope->getBody(); $envelopeID = $this->_envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.‘|‘.$envelopeID.‘‘."\r\n",FILE_APPEND); $this->_queue->ack($envelopeID); } } //用函數的方式 直接在回調裏面處理,也可以在回調裏面,再load新的model 事務在model裏面處理 function processMessage($envelope, $queue) { /* $msg = $envelope->getBody(); $envelopeID = $envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents("log{$pid}.log", $msg.‘|‘.$envelopeID.‘‘."\r\n",FILE_APPEND); $queue->ack($envelopeID); //如果設置為AMQP_AUTOACK 那麽不需要該行也可以自動應答*/ $b = new B($envelope,$queue); $b->test(); } //$s = $mq->run(array($a,‘processMessage‘),false); //$s = $mq->run_auto("processMessage",false); $res = $mq->run_manual(); //沒有拿到數據返回的是false if($res){ $content_obj = $res[‘content_boj‘]; $queue_obj = $res[‘queue_obj‘]; $content = $res[‘content_boj‘]->getBody(); //進行應答 //$obj->ack($res[‘message‘]->getDeliveryTag()); $queue_obj->ack($content_obj->getDeliveryTag()); var_dump($content); } else { var_dump("隊列為空"); }
MQ.php
<?php /** * Created by PhpStorm. * User: brady * Date: 2017/12/6 * Time: 14:42 * * amqp協議操作類,可以訪問rabbitMQ * 需先安裝php_amqp擴展 * */ class MQ { //配置 public $configs = array(); //交換機名稱 public $exchange_name = ‘‘; //隊列名稱 public $queue_name = ‘‘; //路由名稱 註意 如果用同一個路由綁定交換機,當推送的時候,會同時推送到這幾個key上 $q = new AMQPQueue($channel); $q->setName(‘queue3‘); $q->setName(‘queue2‘); $q->bind(‘exchange‘,$routingkey); public $route_key = ‘‘; //是否持久化 默認true public $durable = true; /* * 是否自動刪除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $auto_delete = false; //鏡像隊列,打開後消息會在節點之間復制,有master和slave的概念 public $mirror = false; //連接 private $_conn = NULL; //交換機對象 private $_exchange = NULL; //信道對象 private $_channel = NULL; //隊列對象 private $_queue = NULL; /** * MQ constructor. * @configs array(‘host‘=>$host,‘port‘=>5672,‘username‘=>$username,‘password‘=>$password,‘vhost‘=>‘/‘) */ public function __construct($configs=array(),$exchange_name=‘‘,$queue_name=‘‘,$route_key=‘‘) { $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; $this->set_configs($configs); } /** * @desc 配置設置 * @param $configs */ public function set_configs($configs) { if(empty($configs) || !is_array($configs)){ throw new Exception("your config is not array"); } if(empty($configs[‘host‘]) || empty($configs[‘username‘]) || empty($configs[‘password‘])) { throw new Exception("your config is error"); } if(empty($configs[‘vhost‘])){ $configs[‘vhost‘] = ‘/‘; } if(empty($configs[‘port‘])){ $configs[‘port‘] = ‘5672‘; } $configs[‘login‘] = $configs[‘username‘]; unset($configs[‘username‘]); $this->configs = $configs; } /** * 設置是否持久化 * @param $durable */ public function set_durable($durable) { $this->durable = $durable; } /** * 設置是否自動刪除 * @param $auto_delete boolean */ public function set_auto_delete($auto_delete) { $this->auto_delete = $auto_delete; } /** * 設置是否鏡像 * @param $mirror */ public function set_mirror($mirror) { $this->mirror = $mirror; } /** * 連接初始化 */ public function init() { //沒有連接對象,進行連接 有不管 就不用每次都連接和初始化 if(!$this->_conn){ $this->_conn = new AMQPConnection($this->configs); $this->_conn->connect(); $this->init_exchange_queue_route(); } } /** * 初始化 交換機 隊列名 路由 */ public function init_exchange_queue_route() { if(empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)){ throw new Exception("rabbitMQ exchage_name or queue_name or route_key is empty, please check is",‘500‘); } //channel $this->_channel = new AMQPChannel($this->_conn);//創建channel //exchange $this->_exchange = new AMQPExchange($this->_channel);//創建交換機 $this->_exchange->setName($this->exchange_name);//設置交換機名字 $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);//交換機方式為direct if($this->durable) { $this->_exchange->setFlags(AMQP_DURABLE);//是否持久化 } if($this->auto_delete){ $this->_exchange->setFlags(AMQP_AUTODELETE);//是否自動刪除 } $this->_exchange->declareExchange();//申請交換機 //queue $this->_queue = new AMQPQueue($this->_channel); $this->_queue->setName($this->queue_name); if($this->durable){ $this->_queue->setFlags(AMQP_DURABLE); } if($this->auto_delete){ $this->_queue->setFlags(AMQP_AUTODELETE); } if($this->mirror){ $this->_queue->setArgument(‘x-ha-policy‘,‘all‘); } $this->_queue->declareQueue();//申請queue //綁定交換機 $this->_queue->bind($this->exchange_name,$this->route_key); } //關閉連接 public function close() { if($this->_conn){ $this->_conn->disconnect(); } } //斷開連接 public function __destruct() { // TODO: Implement __destruct() method. $this->close(); } //生產消息 public function send($msg) { $this->init(); if(!$this->_conn){ throw new Exception("connect RabbitMQ failed when send message"); } if(is_array($msg)) { $msg = json_encode($msg); } else { $msg = trim(strval($msg)); } return $this->_exchange->publish($msg,$this->route_key); } //消費消息 自動應答模式 public function run_auto($funcation_name,$auto_ack = true) { $this->init(); if(!$funcation_name || !$this->_queue) { throw new Exception("auto ack lose function_name or this->_queue"); } while(true){ if($auto_ack){ $this->_queue->consume($funcation_name,AMQP_AUTOACK); } else { $this->_queue->consume($funcation_name); } } } //手動應答模式 public function run_manual() { $this->init(); //$data = $this->_queue->get(AMQP_AUTOACK); //如果有傳參數,自動應答 $data = $this->_queue->get(); //如果有傳參數,自動應答 if($data){ //不為false 肯定是有對象的 return array(‘queue_obj‘=>$this->_queue,‘content_boj‘=>$data); }else { return false; } } }
暫時就這些了,工作中也就用到這麽多。再深入的,還沒了解到
mq類----2