RabbitMQ基礎使用
RabbitMQ
目錄- RabbitMQ
MQ(message queue)
優勢:
應用解耦
非同步提速
削峰填谷
RabbitMQ簡介
官網
基礎概念
Broker: 接收和分發訊息的應用
Virtual host: 虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離
Connection: publisher/consumer和broker之間的TCP連線
Channel:connectiion的邏輯連線;處理每次訪問rabbitmq都建立一個connection造成巨大開銷。
Exchange: 交換機; 根據分發規則,匹配路由分發訊息到queue中。常用型別:direct(point-to-point), topic(publish-subscribe), fanout(multicast)
Queue:佇列
Binding: exchage和queue建立的虛擬連線。
Publisher: 訊息生產者
Consumer: 訊息消費者
安裝rabbitMQ以及擴充套件
安裝rabbitMQ
docker方式:
management版本的有視覺化管理面板
docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management 管理頁面:127.0.0.1:15672 初始使用者/密碼:guest/guest
其他方式:
brew install rabbitmq-c
or
yum install rabbitmq-c
or
github自行原始碼cmake安裝
安裝php的amqp擴充套件
wget http://pecl.php.net/get/amqp-1.10.2.tgz
解壓後進入目錄:
解壓:
tar -zxvf amqp-1.10.2.tgz
進入目錄後執行:
phpize
安裝amqp需要librabbitmq依賴,需要指定rabbitmq的安裝目錄, 我這裡的目錄是brew安裝後的目錄, 不是mac的可以通過下載rabbit-c後cmake安裝:
下載地址: https://github.com/alanxz/rabbitmq-c/archive/master.zip
安裝請參考: https://blog.csdn.net/weixin_33726313/article/details/91963653
./configure --with-php-config=/Applications/MAMP/bin/php/php7.4.2/bin/php-config -with-amqp --with-librabbitmq-dir=/usr/local/Cellar/rabbitmq-c/0.9.0
make && make install
php.ini中引入擴充套件:
extension=amqp.so
安裝php-amqplib/php-amqplib
composer require php-amqplib/php-amqplib
rabbitmqctl命令的使用
RabbitMQ使用
簡單的使用
說明: 最簡單的處理方式, 傳送者傳送訊息到佇列, 消費者連線佇列消費(這裡使用使用預設的exchange)
訊息生產者: publisher.php
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告一個佇列
$queue_name = 'hello';
$channel->queue_declare($queue_name);
//設定訊息併發送佇列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, '', $queue_name);
//關閉通道和連線
$channel->close();
$connection->close();
訊息消費者: consumer.php
//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告一個佇列
$queue_name = 'hello';
$channel->queue_declare($queue_name);
//定義回撥
$callback = function ($msg) {
echo 'received:' . $msg->body;
};
//消費佇列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
$channel->wait();
}
執行:
啟動消費者(ctrl+C 斷開):
php consumer.php
傳送佇列:
php publisher.php
work queues(工作佇列)
說明: 傳送者傳送佇列後多個消費者來分配處理任務
迴圈排程: 執行多個consumer, publisher傳送多個佇列後, RabbitMQ會按順序依次派發任務給consumer;
訊息確認: 為了確保訊息永不丟失,RabbitMQ支援訊息確認。消費者傳送回一個確認(acknowledgement)以告知RabbitMQ已經接收,處理了特定的訊息,並且RabbitMQ可以自由刪除它;
注意: rabbitmq無法釋放任何未確認的訊息,會導致rabbitmq消耗越來越多的記憶體,這時可以使用rabbitmqctl列印messages_unacknowledged
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
訊息持久化: RabbitMQ退出或崩潰時,除非您告訴它,否則它將忘記佇列和訊息。確保訊息不會丟失需要做兩件事:我們需要將佇列和訊息都標記為持久.(注: 對已經存在的佇列不生效)
公平派遣: rabbitmq它只是盲目地將每第n條訊息傳送給第n個consumer,並不會檢視消費者的未確認訊息數.這樣有時會導致有的consumer很忙碌,而有的又很閒; 我們可以通過basic_qos來控制;比如在處理並確認上一條訊息之前,不要將新訊息傳送給consumer;
訊息生產者: publisher.php
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//接收引數組合成字串
$data = implode('', array_splice($argv, 1)) ?? 'default!';
$data = !empty($data) ? $data : 'default!';
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告一個佇列
$queue_name = 'task2';
//durable引數(第三個): 設定為true時標記為持久,rabbitmq重啟後將依然存在
$channel->queue_declare($queue_name, false, true);
//設定訊息併發送佇列(delivery_mode: 設定訊息永續性)
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', $queue_name);
//關閉通道和連線
$channel->close();
$connection->close();
訊息消費者: consumer.php
//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告一個佇列
$queue_name = 'task2';
$channel->queue_declare($queue_name, false, true);
//定義回撥
$callback = function ($msg) {
echo 'received:' . $msg->body, "\n";
//模擬程式執行時間
sleep(5);
//告訴rabbitmq該訊息已經處理
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//上一個訊息未處理完時,不再接收新的訊息
$channel->basic_qos(null, 1, null);
//消費佇列(no_ack=false時(第四個),啟用訊息確認)
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連線
$channel->close();
$connection->close();
publish/subcribe
exchange types(交換型別)
direct: 預設; 與佇列中routing key進行精準匹配
topic: 主題模式, 下面有使用方法
headers: 與direct的模式不同,不是使用routingkey去做繫結。而是通過訊息headers的鍵值對匹配
fanout: 將接收到的所有訊息廣播到它知道的所有佇列中
訊息生產者: publisher
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);
//宣告一個佇列
$queue_name = 'task6';
$channel->queue_declare($queue_name);
//設定訊息併發送佇列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, $exchang_name, $queue_name);
//關閉通道和連線
$channel->close();
$connection->close();
訊息消費者: consumer
//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);
//宣告一個佇列, 佇列名稱隨機
list($queue_name) = $channel->queue_declare("");
//繫結佇列和交換機
$channel->queue_bind($queue_name, $exchang_name);
//定義回撥
$callback = function ($msg) {
echo 'received:' . $msg->body, "\n";
};
//消費佇列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連線
$channel->close();
$connection->close();
記錄日誌
php sub_consumer.php>logs_from_rabbit.log
routing
說明: 佇列和routing繫結後,不在綁定範圍內的將不再處理
訊息產生者:
//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//接收引數組合成字串
$routing_key = $argv[1]; //獲取提示資訊
//將第三個引數組合成字串
$data = implode('', array_splice($argv, 2));
$data = !empty($data) ? $data : 'default!';
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');
//設定訊息併發送佇列
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);
//關閉通道和連線
$channel->close();
$connection->close();
訊息消費者:
//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');
//建立管道
$channel = $connection->channel();
//宣告交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');
//宣告一個佇列
list($queue_name) = $channel->queue_declare("");
//繫結佇列,交換機,routing
$params = array_slice($argv, 1);
foreach ($params as $param) {
$channel->queue_bind($queue_name, $exchange_name, $param);
}
//定義回撥
$callback = function ($msg) {
echo 'received:' . $msg->delivery_info['routing_key'] . ':' . $msg->body, "\n";
};
//消費佇列
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
$channel->wait();
}
//關閉通道和連線
$channel->close();
$connection->close();
topics
說明:程式碼和上面的類似.交換機型別換成topic
,案例如下
接收所有日誌:
php receive_logs_topic.php “#”
要從設施“ kern ”接收所有日誌:
php receive_logs_topic.php “ kern.*”
或者,如果您只想聽聽“關鍵”日誌:
php receive_logs_topic.php “ *.critical”
您可以建立多個繫結:
php receive_logs_topic.php “ kern.*” “*.critical”
併發出帶有路由鍵“ kern.critical ”型別的日誌:
php emit_log_topic.php "kern.critical" "A critical kernel error"
RPC
略...
laravel結合RabbitMQ
參考: https://blog.csdn.net/weixin_44600422/article/details/106317870
FAQ
報錯: 已經安裝了amqp還是報Class 'PhpAmqplib\Connection\AMQPLazyConnection' not found
1: 檢查php是否安裝了amqp擴充套件
2: 在使用時一定要先引入自動載入的檔案: 如下
require __DIR__.'/../vendor/autoload.php';
報錯: The connection timed out after 3 sec while awaiting incoming data
原因: 由於我使用的是預設的使用者guest登入, 而該使用者不允許遠端登入;
解決:
# 進入docker(非docker執行的忽略此步驟)
docker exec -ti rabbitmq /bin/bash
# 增加使用者
rabbitmqctl add_user admin admin
# 設定tag(類似使用者組)
rabbitmqctl set_user_tags admin administrator
# 設定許可權
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
報錯: make的時候報/usr/bin/ld: cannot find -lrabbitmq錯誤
原因: 找不到庫檔案librabbitmq.so
解決: 參考https://www.cnblogs.com/chenhaoyu/p/13925905.html
1: 找到擴充套件的位置
find / -name librabbitmq.so
2: 將庫檔案所在的路徑加入到 /etc/ld.so.conf 尾部,並使之生效
echo '/disk2/temp/rabbitmq-c-master/build/librabbitmq/' >> /etc/ld.so.conf
3: 重新整理配置檔案使之生效
ldconfig
4: 修改環境變數,加入庫的檔案路徑
export LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH
檢視是否已經新增: export -p
5: 將上述 export 命令加入到配置檔案 ~/.bashrc,使之永久生效。
echo 'LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH' >> ~/.bashrc
6: 重新整理配置
source ~/.bashrc
參考資料
https://blog.csdn.net/weixin_44600422/article/details/106317870
https://blog.csdn.net/weixin_33726313/article/details/91963653