1. 程式人生 > >rabbitmq訊息佇列php實際應用

rabbitmq訊息佇列php實際應用

rabbitmq 訊息佇列 php應用

前段時間公司需要用到訊息佇列,就湊時間研究了下rabbitmq,由於本人ubuntu環境,windows應用不確定哈。

程式碼貼出來了,根據自身專案編寫,並不適合所有專案,需要調整自行改動。

1.MQ則是遵循了AMQP協議的具體實現和產品,所以需要AMQPLib的支援

composer require php-amqplib/php-amqplib

2.參考

3.base示例


<?php

/**
 * @datetime 2016-11-28  13:30:19
 * @encoding UTF-8 
 * @filename
RabbitmqService.php * @description */
namespace projects\Service; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqService { /** * 配置 * @var array */ public $config; /** * @var string */ public $channel; /** * initialized * @var
string */
public $connect; /** * @var string */ public $queue_name = ''; /** * @var string */ public $exchange_name = ''; /** * @var string */ public $severity; public function __construct(array $config) { $this->config = $config; } /** * @param
string $channel_id */
public function open($channel_id = null) { $this->connect = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pwd'], $this->config['vhost']); $this->channel = $this->connect->channel($channel_id); } /** * @param string $queue * @param bool $passive * @param bool $durable * @param bool $exclusive * @param bool $auto_delete * @param bool $nowait * @param null $arguments * @param null $ticket * @return mixed|null */ public function queueDeclare( $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null, $ticket = null ) { $this->queue_name = $queue; $this->channel->queue_declare($this->queue_name, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket); } /** * @param string $queue_name * @param string $exchange_name * @param string $severity */ public function queueBind($queue_name = '', $exchange_name = '', $severity = '') { if($queue_name == '') $queue_name = $this->queue_name; if($exchange_name == '') $exchange_name = $this->exchange_name; $this->severity = $severity; $this->channel->queue_bind($queue_name, $exchange_name, $this->severity); } /** * @param $path * @param string $data * @param string $type * @return int */ public function putContents($path, $data = '', $type = '') { return file_put_contents($path, $data, $type); } /** * @param array $data * @param $name * @param $exchange_name * @param string $type * @param bool $queue_durable * @param bool $message_durable */ public function send(array $data, $name, $exchange_name, $type = 'fanout', $queue_durable = true, $message_durable = true) { $this->open(); $this->exchangeDeclare($exchange_name, $type, $queue_durable); $this->queueDeclare($name, false, $queue_durable, false, false); if(isset($data['time'])) { $this->basicPublish($data, $message_durable, $name); } $this->close(); } /** * @param $exchange * @param $type * @param bool $durable * @param bool $passive * @param bool $auto_delete * @param bool $internal * @param bool $nowait * @param null $arguments * @param null $ticket */ public function exchangeDeclare( $exchange, $type, $durable = false, $passive = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = null, $ticket = null ) { $this->exchange_name = $exchange; $this->channel->exchange_declare($exchange, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket); } /** * A Message for use with the Channnel.basic_* methods. * @param $msg * @param $message_durable * @return AMQPMessage */ public function message($msg, $message_durable) { if($message_durable === true) { return new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); } return new AMQPMessage($msg); } /** * close connect */ public function close() { $this->channel->close(); $this->connect->close(); } /** * Publishes a message * @param array $msg * @param bool $message_durable * @param string $routing_key * @param string $exchange * @param bool $mandatory * @param bool $immediate * @param null $ticket */ public function basicPublish(array $msg, $message_durable = true, $routing_key = '', $exchange = '', $mandatory = false, $immediate = false, $ticket = null ) { if(empty($msg['queue'])) { $msg['queue'] = $this->queue_name; } if(is_array($msg)) { $msg = serialize($msg); } if(!is_object($msg)) { $msg = $this->message($msg, $message_durable); } $this->channel->basic_publish($msg, $exchange, $routing_key, $mandatory, $immediate, $ticket); } /** * * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. */ function wait() { while(count($this->channel->callbacks)) { $this->channel->wait(); } } /** * start a queue consumer * @param string $consumer_tag */ public function basicConsume($consumer_tag) { $this->channel->basic_consume($this->queue_name, $consumer_tag); } /** * acknowledge one or more messages * @param string $delivery_tag */ public function basicAck($delivery_tag) { $this->channel->basic_ack($delivery_tag); } /** * @param string $queue * @param null $callback * @param string $consumer_tag * @param bool $no_local * @param bool $no_ack * @param bool $exclusive * @param bool $nowait * @param null $ticket * @param array $arguments */ function receive( $queue = '', $callback = null, $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $ticket = null, $arguments = array() ) { $this->channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments); } }