1. 程式人生 > 實用技巧 >RabbitMQ基礎使用

RabbitMQ基礎使用

RabbitMQ

目錄

MQ(message queue)

優勢:
應用解耦
非同步提速
削峰填谷

RabbitMQ簡介

官網

https://www.rabbitmq.com

基礎概念

Broker: 接收和分發訊息的應用

Virtual host: 虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離

Connection: publisher/consumer和broker之間的TCP連線

Channel:connectiion的邏輯連線;處理每次訪問rabbitmq都建立一個connection造成巨大開銷。

Exchange: 交換機; 根據分發規則,匹配路由分發訊息到queue中。常用型別:direct(point-to-point), topic(publish-subscribe), fanout(multicast)

Queue:佇列

Binding: exchage和queue建立的虛擬連線。

Publisher: 訊息生產者

Consumer: 訊息消費者

安裝rabbitMQ以及擴充套件

安裝rabbitMQ

docker方式:

management版本的有視覺化管理面板

docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

管理頁面:127.0.0.1:15672

初始使用者/密碼:guest/guest

其他方式:

brew install rabbitmq-c
or
yum install rabbitmq-c
or 
github自行原始碼cmake安裝

安裝php的amqp擴充套件

wget http://pecl.php.net/get/amqp-1.10.2.tgz

解壓後進入目錄:

解壓:
tar -zxvf amqp-1.10.2.tgz

進入目錄後執行: 
phpize

安裝amqp需要librabbitmq依賴,需要指定rabbitmq的安裝目錄, 我這裡的目錄是brew安裝後的目錄, 不是mac的可以通過下載rabbit-c後cmake安裝:

下載地址: https://github.com/alanxz/rabbitmq-c/archive/master.zip
安裝請參考: https://blog.csdn.net/weixin_33726313/article/details/91963653

./configure --with-php-config=/Applications/MAMP/bin/php/php7.4.2/bin/php-config -with-amqp --with-librabbitmq-dir=/usr/local/Cellar/rabbitmq-c/0.9.0

make && make install

php.ini中引入擴充套件:

extension=amqp.so

安裝php-amqplib/php-amqplib

composer require php-amqplib/php-amqplib

rabbitmqctl命令的使用

https://www.rabbitmq.com/rabbitmqctl.8.html

RabbitMQ使用

簡單的使用

說明: 最簡單的處理方式, 傳送者傳送訊息到佇列, 消費者連線佇列消費(這裡使用使用預設的exchange)

訊息生產者: publisher.php

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告一個佇列
$queue_name = 'hello';
$channel->queue_declare($queue_name);

//設定訊息併發送佇列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, '', $queue_name);

//關閉通道和連線
$channel->close();
$connection->close();

訊息消費者: consumer.php

//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告一個佇列
$queue_name = 'hello';
$channel->queue_declare($queue_name);

//定義回撥
$callback = function ($msg) {
    echo 'received:' . $msg->body;
};

//消費佇列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
    $channel->wait();
}

執行:

啟動消費者(ctrl+C 斷開): 
php consumer.php

傳送佇列:
php publisher.php

work queues(工作佇列)

說明: 傳送者傳送佇列後多個消費者來分配處理任務

迴圈排程: 執行多個consumer, publisher傳送多個佇列後, RabbitMQ會按順序依次派發任務給consumer;

訊息確認: 為了確保訊息永不丟失,RabbitMQ支援訊息確認。消費者傳送回一個確認(acknowledgement)以告知RabbitMQ已經接收,處理了特定的訊息,並且RabbitMQ可以自由刪除它;

注意: rabbitmq無法釋放任何未確認的訊息,會導致rabbitmq消耗越來越多的記憶體,這時可以使用rabbitmqctl列印messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

訊息持久化: RabbitMQ退出或崩潰時,除非您告訴它,否則它將忘記佇列和訊息。確保訊息不會丟失需要做兩件事:我們需要將佇列和訊息都標記為持久.(注: 對已經存在的佇列不生效)

公平派遣: rabbitmq它只是盲目地將每第n條訊息傳送給第n個consumer,並不會檢視消費者的未確認訊息數.這樣有時會導致有的consumer很忙碌,而有的又很閒; 我們可以通過basic_qos來控制;比如在處理並確認上一條訊息之前,不要將新訊息傳送給consumer;

訊息生產者: publisher.php

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//接收引數組合成字串
$data = implode('', array_splice($argv, 1)) ?? 'default!';
$data = !empty($data) ? $data : 'default!';

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告一個佇列
$queue_name = 'task2';
//durable引數(第三個): 設定為true時標記為持久,rabbitmq重啟後將依然存在
$channel->queue_declare($queue_name, false, true);

//設定訊息併發送佇列(delivery_mode: 設定訊息永續性)
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, '', $queue_name);

//關閉通道和連線
$channel->close();
$connection->close();

訊息消費者: consumer.php

//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告一個佇列
$queue_name = 'task2';
$channel->queue_declare($queue_name, false, true);

//定義回撥
$callback = function ($msg) {
    echo 'received:' . $msg->body, "\n";
    //模擬程式執行時間
    sleep(5);
    //告訴rabbitmq該訊息已經處理
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//上一個訊息未處理完時,不再接收新的訊息
$channel->basic_qos(null, 1, null);
//消費佇列(no_ack=false時(第四個),啟用訊息確認)
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連線
$channel->close();
$connection->close();

publish/subcribe

exchange types(交換型別)

direct: 預設; 與佇列中routing key進行精準匹配

topic: 主題模式, 下面有使用方法

headers: 與direct的模式不同,不是使用routingkey去做繫結。而是通過訊息headers的鍵值對匹配

fanout: 將接收到的所有訊息廣播到它知道的所有佇列中

訊息生產者: publisher

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);


//宣告一個佇列
$queue_name = 'task6';
$channel->queue_declare($queue_name);

//設定訊息併發送佇列
$msg = new AMQPMessage('hello world');
$channel->basic_publish($msg, $exchang_name, $queue_name);

//關閉通道和連線
$channel->close();
$connection->close();

訊息消費者: consumer

//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告交換機
$exchang_name = 'logs2';
$channel->exchange_declare($exchang_name, 'fanout', false, false, false);

//宣告一個佇列, 佇列名稱隨機
list($queue_name) = $channel->queue_declare("");

//繫結佇列和交換機
$channel->queue_bind($queue_name, $exchang_name);

//定義回撥
$callback = function ($msg) {
    echo 'received:' . $msg->body, "\n";
};

//消費佇列
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連線
$channel->close();
$connection->close();

記錄日誌

php sub_consumer.php>logs_from_rabbit.log

routing

說明: 佇列和routing繫結後,不在綁定範圍內的將不再處理

訊息產生者:

//引入auoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

//接收引數組合成字串
$routing_key = $argv[1];    //獲取提示資訊
//將第三個引數組合成字串
$data = implode('', array_splice($argv, 2));
$data = !empty($data) ? $data : 'default!';

//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');

//設定訊息併發送佇列
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, $exchange_name, $routing_key);

//關閉通道和連線
$channel->close();
$connection->close();

訊息消費者:

//載入autoload
require_once __DIR__ . '/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;


//連線
$connection = new AMQPStreamConnection('10.8.7.15', 5672, 'admin', 'admin');

//建立管道
$channel = $connection->channel();

//宣告交換機
$exchange_name = 'logs_record';
$channel->exchange_declare($exchange_name, 'direct');

//宣告一個佇列
list($queue_name) = $channel->queue_declare("");

//繫結佇列,交換機,routing
$params = array_slice($argv, 1);
foreach ($params as $param) {
    $channel->queue_bind($queue_name, $exchange_name, $param);
}

//定義回撥
$callback = function ($msg) {
    echo 'received:' . $msg->delivery_info['routing_key'] . ':' . $msg->body, "\n";
};

//消費佇列
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);

//監聽回撥並執行回撥程式
while ($channel->is_consuming()) {
    $channel->wait();
}

//關閉通道和連線
$channel->close();
$connection->close();

topics

說明:程式碼和上面的類似.交換機型別換成topic,案例如下

接收所有日誌:
php receive_logs_topic.php “#”

要從設施“ kern ”接收所有日誌:
php receive_logs_topic.php “ kern.*”

或者,如果您只想聽聽“關鍵”日誌:
php receive_logs_topic.php “ *.critical”

您可以建立多個繫結:
php receive_logs_topic.php “ kern.*”  “*.critical”

併發出帶有路由鍵“ kern.critical ”型別的日誌:
php emit_log_topic.php "kern.critical" "A critical kernel error"

RPC

略...

laravel結合RabbitMQ

參考: https://blog.csdn.net/weixin_44600422/article/details/106317870

FAQ

報錯: 已經安裝了amqp還是報Class 'PhpAmqplib\Connection\AMQPLazyConnection' not found

1: 檢查php是否安裝了amqp擴充套件

2: 在使用時一定要先引入自動載入的檔案: 如下
require __DIR__.'/../vendor/autoload.php';

報錯: The connection timed out after 3 sec while awaiting incoming data

原因: 由於我使用的是預設的使用者guest登入, 而該使用者不允許遠端登入;

解決:

# 進入docker(非docker執行的忽略此步驟)
docker exec -ti rabbitmq /bin/bash

# 增加使用者
rabbitmqctl add_user admin admin

# 設定tag(類似使用者組)
rabbitmqctl set_user_tags admin administrator

# 設定許可權
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

報錯: make的時候報/usr/bin/ld: cannot find -lrabbitmq錯誤

原因: 找不到庫檔案librabbitmq.so

解決: 參考https://www.cnblogs.com/chenhaoyu/p/13925905.html

1: 找到擴充套件的位置
find / -name librabbitmq.so

2: 將庫檔案所在的路徑加入到 /etc/ld.so.conf 尾部,並使之生效
echo '/disk2/temp/rabbitmq-c-master/build/librabbitmq/' >> /etc/ld.so.conf

3: 重新整理配置檔案使之生效
ldconfig

4: 修改環境變數,加入庫的檔案路徑
export LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH

檢視是否已經新增: export -p

5: 將上述 export 命令加入到配置檔案 ~/.bashrc,使之永久生效。
echo 'LIBRARY_PATH=/disk2/temp/rabbitmq-c-master/build/librabbitmq/:$LIBRARY_PATH' >> ~/.bashrc

6: 重新整理配置
source ~/.bashrc

參考資料

https://www.rabbitmq.com/tutorials

https://my.oschina.net/peaksoho/blog/2872689

https://blog.csdn.net/weixin_44600422/article/details/106317870

https://blog.csdn.net/weixin_33726313/article/details/91963653