RabbitMQ官方中文入門教程(PHP版) 第一部分:Hello World
RabbitMQ是一個訊息代理。它的核心原理非常簡單:接收和傳送訊息。你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、儲存和傳送二進位制的資料——訊息。一般提到RabbitMQ和訊息,都用到一些專有名詞。
- 生產(Producing)意思就是傳送。傳送訊息的程式就是一個生產者(producer)。我們一般用”P”來表示:
- 佇列(queue)就是郵箱的名稱。訊息通過你的應用程式和RabbitMQ進行傳輸,它們能夠只儲存在一個佇列(queue)中。
佇列(queue)
- 消費(Consuming)和獲取訊息是一樣的意思。一個消費者(consumer)就是一個等待獲取訊息的程式。我們把它畫作”C”:
Hello World!
(使用pika 0.9.5 Python客戶端)
我們的“Hello world”不會很複雜——僅僅傳送一個訊息,然後獲取它並輸出到螢幕。這樣以來我們需要兩個程式,一個用作傳送訊息,另一個接受訊息並列印訊息內容
我們大體的設計是這樣的:
生產者(Producer)把訊息傳送到一個名為“hello”的佇列中。消費者(consumer)從這個佇列中獲取訊息。
RabbitMQ庫
RabbitMQ使用的是AMQP協議。要使用她你就必須需要一個使用同樣協議的庫。幾乎所有的程式語言都有可選擇的庫。
python也是一樣,可以從以下幾個庫中選擇:在這一系列教程中,我們打算使用PHP 的AMQP擴充套件。詳細教程請檢視:
傳送訊息
我們第一個程式send.php會發送一個訊息到佇列中。首先要做的事情就是建立一個到RabbitMQ伺服器的連線。
1 |
$connection
= new AMQPConnection( array ( 'host'
=> '127.0.0.1' ,
'port' => '5672' ,
'vhost' => '/' ,
'login' => 'guest' ,
'password' => 'guest' )); |
現在我們已經連線上伺服器了,那麼,在傳送訊息之前我們需要確認佇列是存在的。如果我們把訊息傳送到一個不存在的佇列,RabbitMQ會丟棄這條訊息。我門先建立一個名為hello的佇列,然後把訊息傳送到這個佇列中。
1 |
$queue =
new AMQPQueue( $channel ); |
2 |
$queue ->setName( $queueName ); |
這時候我們就可以傳送訊息了,我們第一條訊息只包含了 Hello World!字串,我們打算把它傳送到我們的hello佇列。
在RabbitMQ中,訊息是不能直接傳送到佇列,它需要傳送到交換器(exchange)中。我們不打算在這裡深入討論它——你可以通過教程的第三部分瞭解更多。現在我們所需要了解的是如何使用預設的交換器(exchange),它使用一個空字串來標識。交換器允許我們指定某條訊息需要投遞到哪個佇列,$$routeKey引數必須指定為佇列的名稱:
1 |
$exchange ->publish( $message ,
$routeKey ); |
2 |
var_dump( "[x] Sent 'Hello World!'" ); |
在退出程式之前,我們需要確認網路緩衝已經被刷寫、訊息已經投遞到RabbitMQ。完成這些事情(正確的關閉連線)是很簡單的。
1 |
$connection ->disconnect(); |
獲取資料
我們的第二個程式receive.php,將會從佇列中獲取訊息並列印訊息。
這次我們還是先要連線到RabbitMQ伺服器。連線伺服器的程式碼和之前是一樣的。
下一步也和之前一樣,我們需要確認佇列是存在的。使用$queue->declare()建立一個佇列——我們可以執行這個命令很多次,但是隻有一個佇列會建立。
1 |
$queue =
new AMQPQueue( $channel ); |
2 |
$queue ->setName( $queueName ); |
3 |
$queue -> declare (); |
你也許要問為什麼重複聲明瞭佇列——我們已經在前面的程式碼中聲明瞭它。如果我們確定了佇列是已經存在的,那麼我們可以不這麼做。比如先執行send.php程式。可是我們並不確定哪個程式先執行,這種情況的話再程式中重複宣告是好的做法。
列出所有佇列
你也許希望檢視RabbitMQ由哪些佇列、有多少訊息在佇列中。你可以使用rabbitmqctl工具(使用有許可權的使用者):
1
$
sudo
rabbitmqctl list_queues
2
Listing queues ...
3
hello 0
4
...
done
.
(omit sudo on Windows)
(在Windows中不需要sudo命令)
從佇列中獲取訊息相對來說稍顯複雜。需要為佇列定義一個回撥(callback)函式。當我們獲取到訊息的時候,Pika庫就會呼叫這個回撥(callback)函式。我們的這個回撥函式將會但因訊息的內容到螢幕上。
1 |
function callback( $envelope ,
$queue ) { |
2 |
$msg
= $envelope ->getBody(); |
3 |
var_dump( " [x] Received:"
. $msg ); |
4 |
$queue ->nack( $envelope ->getDeliveryTag()); |
5 |
} |
下一步,我們需要告訴RabbitMQ這個回撥函式將會從hello佇列中接收訊息:
1 |
$queue ->consume( 'callback' ); |
要成功執行這些命令,我們必須保證佇列是存在的,我們已經能夠保證——我們之前已經使用建立了一個佇列queue_declare。
$queue->nack()函式稍後會介紹。
最後,我們輸入一個無限迴圈來等待訊息資料並確執行回撥函式。
1 |
var_dump( '[*] Waiting for messages. To exit press CTRL+C' ); |
2 |
while (TRUE) { |
3 |
$queue ->consume( 'callback' ); |
4 |
} |
整合
send.php的全部程式碼:
01 |
<?php |
02 |
03 |
/** |
04 |
* PHP amqp(RabbitMQ) Demo-1 |
05 |
* @author yuansir <[email protected]/yuansir-web.com> |
06 |
*/ |
07 |
$exchangeName
= 'demo' ; |
08 |
$queueName
= 'hello' ; |
09 |
$routeKey =
'hello' ; |
10 |
$message =
'Hello World!' ; |
11 |
12 |
$connection
= new AMQPConnection( array ( 'host'
=> '127.0.0.1' ,
'port' => '5672' ,
'vhost' => '/' ,
'login' => 'guest' ,
'password' => 'guest' )); |
13 |
$connection ->connect()
or die ( "Cannot connect to the broker!\n" ); |
14 |
15 |
try { |
16 |
$channel
= new
AMQPChannel( $connection ); |
17 |
$exchange
= new
AMQPExchange( $channel ); |
18 |
$exchange ->setName( $exchangeName ); |
19 |
$queue
= new
AMQPQueue( $channel ); |
20 |
$queue ->setName( $queueName ); |
21 |
$exchange ->publish( $message ,
$routeKey ); |
22 |
var_dump( "[x] Sent 'Hello World!'" ); |
23 |
} catch (AMQPConnectionException $e ) { |
24 |
var_dump( $e ); |
25 |
exit (); |
26 |
} |
27 |
$connection ->disconnect(); |
receive.py的全部程式碼:
01 |
<?php |
02 |
03 |
/** |
04 |
* PHP amqp(RabbitMQ) Demo-1 |
05 |
* @author yuansir <[email protected]/yuansir-web.com> |
06 |
*/ |
07 |
$exchangeName
= 'demo' ; |
08 |
$queueName
= 'hello' ; |
09 |
$routeKey =
'hello' ; |
10 |
11 |
$connection
= new AMQPConnection( array ( 'host'
=> '127.0.0.1' ,
'port' => '5672' ,
'vhost' => '/' ,
'login' => 'guest' ,
'password' => 'guest' )); |
12 |
$connection ->connect()
or die ( "Cannot connect to the broker!\n" ); |
13 |
$channel =
new AMQPChannel( $connection ); |
14 |
$exchange =
new AMQPExchange( $channel ); |
15 |
$exchange ->setName( $exchangeName ); |
16 |
$exchange ->setType(AMQP_EX_TYPE_DIRECT); |
17 |
$exchange -> declare (); |
18 |
$queue =
new AMQPQueue( $channel ); |
19 |
$queue ->setName( $queueName ); |
20 |
$queue -> declare (); |
21 |
$queue ->bind( $exchangeName ,
$routeKey ); |
22 |
23 |
var_dump( '[*] Waiting for messages. To exit press CTRL+C' ); |
24 |
while (TRUE) { |
25 |
$queue ->consume( 'callback' ); |
26 |
} |
27 |
$connection ->disconnect(); |
28 |
29 |
function callback( $envelope ,
$queue ) { |
30 |
$msg
= $envelope ->getBody(); |
31 |
var_dump( " [x] Received:"
. $msg ); |
32 |
$queue ->nack( $envelope ->getDeliveryTag()); |
33 |
} |
現在就可以在終端中執行我們的程式了。首先,用send.php重續傳送一條訊息:
1 |
php send.php |
2 |
string(23) "[x] Sent 'Hello World!'" </pre> |
生產者(producer)程式send.php每次執行之後就會停止。現在我們就來接收訊息:
1 |
php receive.php |
2 |
string(46) "[*] Waiting for messages. To exit press CTRL+C" |
3 |
string(26) " [x] Received:Hello World!" </pre> |
成功了!我們已經通過RabbitMQ傳送第一條訊息。你也許已經注意到了,receive.py程式並沒有退出。它一直在準備獲取訊息,你可以通過Ctrl-C來終端它。
試下在新的終端中再次執行send.php。
我們已經學會如何傳送訊息到一個已知佇列中並接收訊息。是時候移步到第二部分了,我們將會建立一個簡單的工作佇列(work queue)。