rabbitmq訊息佇列php實際應用
阿新 • • 發佈:2019-01-01
rabbitmq 訊息佇列 php應用
前段時間公司需要用到訊息佇列,就湊時間研究了下rabbitmq,由於本人ubuntu環境,windows應用不確定哈。
程式碼貼出來了,根據自身專案編寫,並不適合所有專案,需要調整自行改動。
1.MQ則是遵循了AMQP協議的具體實現和產品,所以需要AMQPLib的支援
composer require php-amqplib/php-amqplib
2.參考
3.base示例
<?php
/**
* @datetime 2016-11-28 13:30:19
* @encoding UTF-8
* @filename RabbitmqService.php
* @description
*/
namespace projects\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitmqService
{
/**
* 配置
* @var array
*/
public $config;
/**
* @var string
*/
public $channel;
/**
* initialized
* @var string
*/
public $connect;
/**
* @var string
*/
public $queue_name = '';
/**
* @var string
*/
public $exchange_name = '';
/**
* @var string
*/
public $severity;
public function __construct(array $config)
{
$this->config = $config;
}
/**
* @param string $channel_id
*/
public function open($channel_id = null)
{
$this->connect = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pwd'], $this->config['vhost']);
$this->channel = $this->connect->channel($channel_id);
}
/**
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param null $arguments
* @param null $ticket
* @return mixed|null
*/
public function queueDeclare(
$queue = '',
$passive = false,
$durable = false,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
)
{
$this->queue_name = $queue;
$this->channel->queue_declare($this->queue_name, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket);
}
/**
* @param string $queue_name
* @param string $exchange_name
* @param string $severity
*/
public function queueBind($queue_name = '', $exchange_name = '', $severity = '')
{
if($queue_name == '') $queue_name = $this->queue_name;
if($exchange_name == '') $exchange_name = $this->exchange_name;
$this->severity = $severity;
$this->channel->queue_bind($queue_name, $exchange_name, $this->severity);
}
/**
* @param $path
* @param string $data
* @param string $type
* @return int
*/
public function putContents($path, $data = '', $type = '')
{
return file_put_contents($path, $data, $type);
}
/**
* @param array $data
* @param $name
* @param $exchange_name
* @param string $type
* @param bool $queue_durable
* @param bool $message_durable
*/
public function send(array $data, $name, $exchange_name, $type = 'fanout', $queue_durable = true, $message_durable = true)
{
$this->open();
$this->exchangeDeclare($exchange_name, $type, $queue_durable);
$this->queueDeclare($name, false, $queue_durable, false, false);
if(isset($data['time'])) {
$this->basicPublish($data, $message_durable, $name);
}
$this->close();
}
/**
* @param $exchange
* @param $type
* @param bool $durable
* @param bool $passive
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param null $arguments
* @param null $ticket
*/
public function exchangeDeclare(
$exchange,
$type,
$durable = false,
$passive = false,
$auto_delete = true,
$internal = false,
$nowait = false,
$arguments = null,
$ticket = null
)
{
$this->exchange_name = $exchange;
$this->channel->exchange_declare($exchange, $type, $passive, $durable, $auto_delete, $internal, $nowait, $arguments, $ticket);
}
/**
* A Message for use with the Channnel.basic_* methods.
* @param $msg
* @param $message_durable
* @return AMQPMessage
*/
public function message($msg, $message_durable)
{
if($message_durable === true) {
return new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
}
return new AMQPMessage($msg);
}
/**
* close connect
*/
public function close()
{
$this->channel->close();
$this->connect->close();
}
/**
* Publishes a message
* @param array $msg
* @param bool $message_durable
* @param string $routing_key
* @param string $exchange
* @param bool $mandatory
* @param bool $immediate
* @param null $ticket
*/
public function basicPublish(array $msg,
$message_durable = true,
$routing_key = '',
$exchange = '',
$mandatory = false,
$immediate = false,
$ticket = null
)
{
if(empty($msg['queue'])) {
$msg['queue'] = $this->queue_name;
}
if(is_array($msg)) {
$msg = serialize($msg);
}
if(!is_object($msg)) {
$msg = $this->message($msg, $message_durable);
}
$this->channel->basic_publish($msg, $exchange, $routing_key, $mandatory, $immediate, $ticket);
}
/**
*
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this PHP
* method.
*/
function wait()
{
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
/**
* start a queue consumer
* @param string $consumer_tag
*/
public function basicConsume($consumer_tag)
{
$this->channel->basic_consume($this->queue_name, $consumer_tag);
}
/**
* acknowledge one or more messages
* @param string $delivery_tag
*/
public function basicAck($delivery_tag)
{
$this->channel->basic_ack($delivery_tag);
}
/**
* @param string $queue
* @param null $callback
* @param string $consumer_tag
* @param bool $no_local
* @param bool $no_ack
* @param bool $exclusive
* @param bool $nowait
* @param null $ticket
* @param array $arguments
*/
function receive(
$queue = '',
$callback = null,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$ticket = null,
$arguments = array()
)
{
$this->channel->basic_consume($queue, $consumer_tag, $no_local, $no_ack, $exclusive, $nowait, $callback, $ticket, $arguments);
}
}