php rabbitmq延遲佇列示例
阿新 • • 發佈:2019-01-03
<?php
/**
* Created by PhpStorm.
* User: he
* Date: 17-7-17
* Time: 下午5:38
*/
namespace AcmeBundle\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitBase
{
/**
* 場景死信收容佇列
* @var array
*/
private static $scene_out_queue = [
self::TEST => 'test_queue' , // 測試佇列
];
/**
* 場景死信收容交換機
* @var array
*/
private static $scene_out_exchange = [
self::TEST => 'test.exchange', // 測試佇列
];
/**
* 佇列延遲時間 | 毫秒時間
* @var array
*/
private static $ttl_time = [
self::TEST => 10000, // 86400*3
];
/**
* 場景列表
* @var array
*/
private static $scene_list = [
'TEST' => 'TEST', // 測試佇列
];
const TEST = 'TEST';// 測試佇列
/**
* 管道連線
* @type object
* @var AMQPStreamConnection
*/
private $connection;
/**
* 交換機
* @type object
* @var \PhpAmqpLib\Channel\AMQPChannel
*/
private $channel;
/**
* 佇列名
* @var string
*/
private $queue_name;
/**
* 交換機名
* @var string
*/
private $exchange_name;
/**
* 場景引數
* @var string
*/
private static $time_scene;
/**
* 是否持久化
* @var bool
*/
private static $is_durable = true;
/**
* 是否延遲
* @var bool
*/
private static $is_delay = false;
/**
* 當前交換機
* @var string
*/
private static $delay_exchange;
/**
* 當前佇列
* @var string
*/
private static $delay_queue;
/**
* 延遲佇列引數
* @var array
*/
private static $arguments = [];
/**
* 交換機型別
* @var string
*/
private static $type = 'fanout';
/**
* 0-9-1 SIG
* @link http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3
* @var string
*/
private static $T_STRING_SHORT = 'S';
/**
* 0-9-1 SIG
* @link http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3
* @var string
*/
private static $T_INT_LONG = 'I';
/**
* RabbitBase constructor.
* @param array $config // mq配置引數
*/
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host']??'',
$config['port']??'',
$config['user']??'',
$config['pwd']??'',
$config['vhost']??''
);
if(!$this->getCloseStatus()){
// throw new \Exception('AMQP Connection fail');
echo 'ERROR: AMQP Connection Fail';exit;
}
$this->channel = $this->connection->channel();
}
/**
* 設定私有屬性
* @param $name
* @param $value
*/
public function __set($name, $value)
{
// TODO: Implement __set() method.
$this->$name = $value;
}
/**
* 獲取私有屬性
* @param $name // 屬性名
* @return null
*/
public function __get($name)
{
// TODO: Implement __get() method.
return isset($this->$name)? $this->$name : null;
}
/**
* 獲取場景列表
* @return array
*/
public function getSceneList()
{
return self::$scene_list;
}
/*****************************************************佇列服務******************************************************/
/**
* 開啟管道
*
* TODO: is_delay 是true時需後面引數
* TODO: is_delay 是true時需要 time_scene引數,否則會返回異常資訊
* TODO: queue_scene 或 exchange_scene 為null時預設使用time_scene
*
* @param string $queue_name 佇列名
* @param string $exchange_name 交換機名
* @param bool $is_receive 是否是處理程式
* @param bool $is_delay 是否延遲
* @param string|null $time_scene 延遲時間 參考 self::$ttl_time
* @param string|null $queue_scene 延遲場景佇列 參考 self::$scene_out_queue
* @param string|null $exchange_scene 延遲場景交換機 參考 self:$scene_out_exchange
* @throws \Exception
* @return mixed|null|string
*/
public function open(
string $queue_name,
string $exchange_name,
bool $is_receive = false,
bool $is_delay = false,
string $time_scene = null,
string $queue_scene = null,
string $exchange_scene = null
)
{
$this->queue_name = $queue_name;
$this->exchange_name = $exchange_name;
self::$time_scene = $time_scene;
self::$delay_exchange = empty($exchange_scene)?$time_scene:$exchange_scene;
self::$delay_queue = empty($queue_scene)?$time_scene:$queue_scene;
self::$is_delay = $is_delay;
try {
// TODO: 設定延遲引數
self::getArguments();
// TODO: 當開始處理管道內訊息時,防止管道未建立引起異常
if (true === $is_receive) {
// 建立交換機
$this->getExchangeDeclare($this->exchange_name, self::$type, self::$is_durable);
// 建立佇列
$this->getQueueDeclare($this->queue_name, self::$is_durable, self::$arguments);
// 佇列和交換機繫結
$this->getQueueBind($this->queue_name, $this->exchange_name);
}
return true;
} catch (\Exception $e){
return 'Info:'.$e->getMessage().' Line:'.$e->getLine().' File:'.$e->getFile();
}
}
/**
* 加入佇列
* @param array $data
* @throws \Exception
* @return bool|string
*/
public function send(array $data)
{
try{
if (empty($this->queue_name) || empty($this->exchange_name)) {
throw new \Exception('arguments queue name or exchange name error');
}
// TODO: Implement 建立超時收容佇列和交換機
empty(self::$time_scene)?:$this->createOutQueue(
self::$scene_out_queue[self::$time_scene]??'',
self::$scene_out_exchange[self::$time_scene]??''
);
// TODO: Implement 建立交換機
$this->getExchangeDeclare($this->exchange_name, self::$type, self::$is_durable);
// TODO: Implement 建立佇列
$this->getQueueDeclare($this->queue_name, self::$is_durable, self::$arguments);
// TODO: Implement 佇列和交換機繫結
$this->getQueueBind($this->queue_name, $this->exchange_name);
// TODO: Implement 加入訊息到佇列
$this->getBasicPublish($data, $this->queue_name);
return $this->close();
} catch (\Exception $e){
return 'Info:'.$e->getMessage().' Line:'.$e->getLine().' File:'.$e->getFile();
}
}
/**
* 處理佇列
* @param string $queue
* @param null $callback
* @param string $consumer_tag
* @param bool $no_local
* @param bool $no_ack
* @param bool $exclusive
* @param bool $nowait
* @param null $ticket
* @param array $arguments
*/
public function receive(
$queue = '',
$callback = null,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$ticket = null,
$arguments = array()
)
{
$this->channel->basic_consume(
$queue,
$consumer_tag,
$no_local,
$no_ack,
$exclusive,
$nowait,
$callback,
$ticket,
$arguments
);
}
/**
*
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this PHP
* method.
*/
public function wait()
{
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
}
/*****************************************************私有服務******************************************************/
/**
* 關閉連線
* @return bool
*/
private function close()
{
$this->closeConnection();
$this->closeChannel();
return !$this->getCloseStatus();
}
/**
* 建立超時佇列和交換機
* @param string $queue_name
* @param string $exchange_name
* @throws \Exception
*/
private function createOutQueue(string $queue_name = '', string $exchange_name = '')
{
if(empty($queue_name) || empty($exchange_name)){
throw new \Exception('queue name or exchange name is empty');
}
if(true === self::$is_delay){
// TODO: 建立延遲交換機
$this->getExchangeDeclare($exchange_name, self::$type, self::$is_durable);
// TODO: 建立延遲佇列
$this->getQueueDeclare($queue_name, self::$is_durable);
// TODO: 佇列和交換機繫結
$this->getQueueBind($queue_name, $exchange_name);
}
}
/**
* 設定超時轉移佇列引數
* @throws \Exception
*/
private static function getArguments()
{
if(true === self::$is_delay){
if(!empty(self::$delay_queue) && !empty(self::$delay_exchange)){
self::$arguments = array(
"x-message-ttl" => array(self::$T_INT_LONG, self::$ttl_time[self::$time_scene]),
);
self::setDelayExchange();
self::setDelayQueue();
} else {
throw new \Exception(__METHOD__."delay arguments error");
}
}
}
/**
* 設定延遲佇列queue
* @throws \Exception
*/
private static function setDelayQueue()
{
$p = ["x-dead-letter-routing-key" => array(
self::$T_STRING_SHORT,
self::$scene_out_queue[self::$delay_queue])
];
if(true === self::$is_delay && !empty(self::$delay_queue)){
self::$arguments = empty(self::$arguments)?:array_merge(self::$arguments, $p);
} else {
throw new \Exception(__METHOD__.'delay arguments error');
}
}
/**
* 設定延遲佇列exchange
* @throws \Exception
*/
private static function setDelayExchange()
{
$p = ["x-dead-letter-exchange" => array(self::$T_STRING_SHORT, self::$scene_out_exchange[self::$delay_exchange])];
if(true === self::$is_delay && !empty(self::$delay_exchange)){
self::$arguments = empty(self::$arguments)?:array_merge(self::$arguments, $p);
} else {
throw new \Exception(__METHOD__.'delay arguments error');
}
}
/**
* 關閉連線
* @return mixed|null
*/
private function closeConnection()
{
return $this->connection->close();
}
/**
* 關閉交換機連線
* @return mixed
*/
private function closeChannel()
{
return $this->channel->close();
}
/**
* 獲取管道連線
* @return AMQPStreamConnection
*/
public function getConnect() : AMQPStreamConnection
{
return $this->connection;
}
/**
* 獲取交換機
* @return \PhpAmqpLib\Channel\AMQPChannel
*/
public function getChannel()
{
return $this->channel;
}
/**
* 獲取連線狀態
* @return bool
*/
private function getCloseStatus(): bool
{
return (bool)$this->connection->isConnected();
}
/**
* 宣告一個佇列,不存在則建立
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param null $arguments
* @param null $ticket
* @return mixed|null
*/
private function getQueueDeclare(
$queue = '',
$durable = true,
$arguments = null,
$passive = false,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$ticket = null
)
{
return $this->channel->queue_declare(
$queue, $passive, $durable, $exclusive, $auto_delete, $nowait, $arguments, $ticket
);
}
/**
* 宣告一個交換機
* @param $exchange
* @param $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param null $arguments
* @param null $ticket
* @return mixed|null
*/
private function getExchangeDeclare(
$exchange,
$type,
$durable = true,
$passive = false,
$auto_delete = false,
$internal = false,
$nowait = false,
$arguments = null,
$ticket = null
)
{
return $this->channel->exchange_declare(
$exchange,
$type,
$passive,
$durable,
$auto_delete,
$internal,
$nowait,
$arguments,
$ticket
);
}
/**
* 加入佇列
* @param $msg
* @param string $exchange
* @param string $routing_key
* @param bool $mandatory
* @param bool $immediate
* @param null $ticket
*/
private function getBasicPublish(
$msg,
$exchange = '',
$routing_key = '',
$mandatory = false,
$immediate = false,
$ticket = null
)
{
self::message($msg, $this->queue_name);
return $this->channel->basic_publish($msg,
$exchange,
$routing_key,
$mandatory,
$immediate,
$ticket
);
}
/**
* 訊息轉換物件
* @param $msg
* @param string $queue
*/
private static function message(&$msg, string $queue)
{
if(is_array($msg)) {
$msg['queue'] = $queue;
$msg = serialize($msg);
}
if(self::$is_durable) {
$msg = new AMQPMessage($msg, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
} else {
$msg = new AMQPMessage($msg);
}
}
/**
* 繫結佇列到交換機
* @param $queue
* @param $exchange
* @param string $routing_key
* @param bool $nowait
* @param null $arguments
* @param null $ticket
* @return mixed|null
*/
private function getQueueBind(
$queue,
$exchange,
$routing_key = '',
$nowait = false,
$arguments = null,
$ticket = null
)
{
return $this->channel->queue_bind(
$queue,
$exchange,
$routing_key,
$nowait,
$arguments,
$ticket
);
}
/**
* 獲取佇列內資訊數
* @deprecated
* @return int
*/
public function getMessageNumber(): int
{
return (int)$this->getQueueDeclare(
$this->queue_name,
self::$is_durable)->method()->message_count();
}
/**
* 獲取消費者數量
* @deprecated
* @return int
*/
public function getConsumerNumber(): int
{
return (int)$this->getQueueDeclare(
$this->queue_name,
self::$is_durable)->method()->consumer_count();
}
}