程式碼實現RabbitMQ死信佇列的建立
阿新 • • 發佈:2020-10-25
前言:
之前有寫過死信佇列的使用場景以及通過管控臺建立死信。這次就通過程式碼實現死信佇列的建立,同時也分享一下RabbitMQ封裝的類。
準備:
1. 先準備一個死信佇列(最後用來消費)的引數配置,包括虛擬機器,交換機,佇列,有效時間等,如下。
2. 按照上面在RabbitMQ中建立虛擬機器和交換機,死信佇列。並讓交換機與死信佇列繫結,操作方法前面有介紹。
3. 這裡就直接提供rabbitMQ操作的基本封裝的類,包括一個基類,生產者類,消費者類。
3.1. 基類。
<?php
namespace rabbitmq;
/** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package rabbitMQ
*/
class BaseMQ
{
/** MQ Channel
* @var \AMQPChannel
*/
public $AMQPChannel ;
/** MQ Link
* @var \AMQPConnection
*/
public $AMQPConnection ;
/** MQ Envelope
* @var \AMQPEnvelope
*/
public $AMQPEnvelope ;
/** MQ Exchange
* @var \AMQPExchange
*/
public $AMQPExchange ;
/** MQ Queue
* @var \AMQPQueue
*/
public $AMQPQueue ;
/** conf
* @var
*/
public $conf ;
/** exchange
* @var
*/
public $exchange ;
/**
* queue
* @var
*/
public $queue;
/**
* routes
* @var
*/
public $route;
/**
* queue_args
* @var
*/
public $queueArgs;
/** link
* BaseMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct($host,$options,$args = [])
{
$config = include 'config/config.php';
if (!$config)
throw new \AMQPConnectionException('config error!');
$this->host = array_merge($config,$host);
isset($options['vhost']) && $this->host['vhost'] = $options['vhost'];
$this->exchange = $options['exchange'];
$this->queue = $options['queue'];
$this->route = $options['route'];
$this->queueArgs = $args;
$this->AMQPConnection = new \AMQPConnection($this->host);
if (!$this->AMQPConnection->connect())
throw new \AMQPConnectionException("Cannot connect to the broker!\n");
}
/**
* close link
*/
public function close()
{
$this->AMQPConnection->disconnect();
}
/** Channel
* @return \AMQPChannel
* @throws \AMQPConnectionException
*/
public function channel()
{
if (!$this->AMQPChannel) {
$this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
}
return $this->AMQPChannel;
}
/** Exchange
* @return \AMQPExchange
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function exchange()
{
if (!$this->AMQPExchange) {
$this->AMQPExchange = new \AMQPExchange($this->channel());
$this->AMQPExchange->setName($this->exchange);
}
return $this->AMQPExchange ;
}
/** queue
* @return \AMQPQueue
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
*/
public function queue()
{
if (!$this->AMQPQueue) {
$this->AMQPQueue = new \AMQPQueue($this->channel());
}
return $this->AMQPQueue ;
}
/** Envelope
* @return \AMQPEnvelope
*/
public function envelope()
{
if (!$this->AMQPEnvelope) {
$this->AMQPEnvelope = new \AMQPEnvelope();
}
return $this->AMQPEnvelope;
}
}
3.2. 生產者類。
<?php
//生產
namespace rabbitmq;
class ProductMQ extends BaseMQ
{
/** 只控制傳送成功 不接受消費者是否收到
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function publish($message)
{
$message = is_array($message)?json_encode($message):$message;
//頻道
$channel = $this->channel();
//建立交換機物件
$ex = $this->exchange();
return $ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2));
}
}
3.3. 消費者。
<?php
namespace rabbitmq;
class ConsumerMQ extends BaseMQ
{
/** 接受訊息 如果終止 重連時會有訊息
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
* @throws \AMQPQueueException
*/
public function run($processMessage)
{
// 建立交換機
$ex = $this->exchange();
// direct型別
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 持久化
$ex->setFlags(AMQP_DURABLE);
// 不存在就釋出
$ex->declareExchange();
// 建立佇列
$q = $this->queue();
// 設定佇列名稱
$q->setName($this->queue);
// 持久化
$q->setFlags(AMQP_DURABLE);
// 佇列引數
is_array($this->queueArgs) && $q->setArguments($this->queueArgs);
//echo "Message Total:".$q->declareQueue()."\n";
$q->declareQueue();
//繫結交換機與佇列,並指定路由鍵
// echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
$q->bind($this->exchange, $this->route);
//阻塞模式接收訊息
// echo "Message:\n";
if (!is_null($processMessage)) {
while (True) {
$q->consume($processMessage);
}
}
$this->close();
}
}
編碼:
上面的死信佇列已經建立好了,接下來主要就是通過程式碼建立一個用於直接生產訊息的普通佇列,但是這個佇列需要設定三個引數。
x-dead-letter-exchange: 關聯死信的交換機
x-dead-letter-routing-key 關聯死信的路由key
x-message-ttl 當前佇列訊息的有效期,也就是多久後訊息自動進行死信佇列,並且從本佇列刪除
1. 程式碼部分:
public function addToDlx()
{
$host = [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => 'report',
'heartbeat' => 60
];
// 普通佇列
$normal = [
'vhost' => 'report', // 虛擬機器
'exchange' => 'normal', // 交換機
'route' => 'normal_route', // 路由key - 用於交換機與佇列進行繫結
'queue' => 'normal_queue', // 佇列
'expire' => 1000*60, // 有效時間單位:毫秒 - 1分鐘
];
// 死信佇列
$normal_dlx = [
'vhost' => 'report',
'exchange' => 'normal_dlx',
'route' => 'normal_dlx_route',
'queue' => 'normal_dlx_queue'
];
// 給普通佇列關聯死信佇列,攜帶的引數
$dlx_args = [
'x-dead-letter-exchange' => $normal_dlx['exchange'],
'x-dead-letter-routing-key' => $normal_dlx['route'],
'x-message-ttl' => $normal['expire'],
];
//////////////// 通過消費者方式建立死信佇列/////////////
$dlx_mq = new ConsumerMQ($host,$normal,$dlx_args);
$dlx_mq->run(null);
////////////////////////////////////////////////////////
//////////////// 將訊息放入普通佇列/////////////////////
$mq = new ProductMQ($host, $normal);
$param = json_encode([
'name' => 'test',
'id' => 11568,
'remark' => '測試一下'
]);
$mq->publish($param);
$mq->close();
////////////////////////////////////////////////////////
}
2. 測試結果:
通過postman點選上面介面,控制檯就可以看出多出了一個normal佇列,並且佇列的Features為“ DTTLDLXDLK ”,$param的訊息也會首先進入“normal”佇列。
2. 1分鐘後(自己設定的),normal的訊息會失效,進而開始新增到了死信佇列“normal_dxl”,可以點選死信檢視最新的訊息資訊。