Laravel5.2使用RabbitMQ初體驗
由於最近需要使用rabbitmq來進行訊息佇列的讀取,但以前從來都沒有接觸過這等高深的技術,所以只好從頭開始研究,下面就把這幾周的成果分享出來,針對沒有接觸過訊息佇列的同學,希望能給你們一些幫助。
- 安裝rabbitmq
- 使用composer安裝laravel的php-amqplib/php-amqblib包
- 推送訊息到佇列和讀取佇列中的訊息
安裝rabbitmq
安裝rabbit首先需要安裝erlang環境,然後下載rabbitmq客戶端進行安裝,由於我實在windows環境下進行搭建的,所以過程很簡單,這裡就不直接介紹安裝方法了,可以自行百度。安裝完成之後記得使用命令列安裝rabbitmq的瀏覽器外掛,然後就可以在瀏覽器中看到rabbitmq的資訊了。
由於我研究的時間不是很長,所以這裡面很多我也不是很清楚。這個Queues下面的列表代表著這個伺服器上存在幾個佇列。
使用composer安裝擴充套件
擴充套件地址
可以直接在composer.json檔案中新增,然後執行composer update,也可以在命令列中執行composer require
推送訊息到rabbitmq中
Github上有詳細的例項程式碼,我這裡只是將其簡化。
config.php
<?php
//建立配置檔案
//使用自動載入類
require_once __DIR__ . '/vendor/autoload.php';
define ('HOST', 'localhost');
define('PORT', 5672); // 注意這裡埠號,在瀏覽器中的埠號是15672,但在這裡確實5672
define('USER', 'guest');
define('PASS', 'guest');
define('VHOST', '/');
//If this is enabled you can see AMQP output on the CLI
//如果為true會顯示一堆<hex>這種型別的除錯資訊。
define('AMQP_DEBUG', false);
amqp_publisher.php
<?php
// 使用配置檔案
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'router'; // 交換器,在我理解,如果兩個佇列使用一個交換器就代表著兩個佇列是同步的,這個佇列裡存在的訊息,在另一個佇列裡也會存在
$queue = 'test'; // 佇列名稱
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST); // 建立連線
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange); // 佇列和交換器繫結
$messageBody = 'testinfo12'; // 要推送的訊息
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange); // 推送訊息
$channel->close();
$connection->close();
執行完這段程式碼之後會發現rabbitmq的test佇列會多出一個訊息
從rabbitmq中取出訊息
在這裡我使用的是github中demo中的basic_get.php中的方法,還有一種amqp_consumer.php方法,大家可以試試看。
basic_get.php
<?php
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'router';
$queue = 'test';
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$message = $channel->basic_get($queue); //取出訊息
print_r($message->body);
$channel->basic_ack($message->delivery_info['delivery_tag']); // 確認取出訊息後會傳送一個ack來確認取出來了,然後會從rabbitmq中將這個訊息移除,如果刪掉這段程式碼,會發現rabbitmq中的訊息還是沒有減少
$channel->close();
$connection->close();
在laravel中使用
因為我們需要實時監聽這個訊息佇列中是否有訊息,如果有就取出訊息佇列進行處理。
這裡laravel提供了一個commad。
在專案目錄下執行php artisan make:console GetMessage 這裡的名字可以自己起
然後找到app/Console/Commands,會發現多出了一個GetMessage.php檔案。
<?php
namespace Ckk\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class UserCenter extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'getMessage'; // 這裡是生成命令的名稱
/**
* The console command description.
*
* @var string
*/
protected $description = '這裡是這個命令的描述';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
// 這裡寫具體程式碼,這裡可以建立一個配置檔案,然後使用配置檔案控制訊息佇列的配置資訊
$queue = 'test';
$connection = new AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$message = $channel->basic_get($queue);
$channel->basic_ack($message->delivery_info['delivery_tag']);
if ($message->body) {
print_r($message->body);
\Log::info('queueInfo:' . $message->body);
}
$channel->close();
$connection->close();
}
在Console目錄下的Kernel.php中新增
protected $commands = [
GetMessage::class,
// Commands\Inspire::class,
];
在專案目錄下執行 php artisan list
檢視是否有這個命令
如果操作正確會顯示getMessage後面是一堆亂碼,因為我的命令描述寫的是中文。
然後在命令列中執行php artisan getMessage
會發現rabbitmq中佇列裡的訊息減少了一個,然後開啟日誌檔案,會發現已經寫進去了。那麼怎麼實時進行檢測呢,只要使用linux自帶的crontab不停的執行php artisan getMessage
就好了。