1. 程式人生 > >RabbitMQ官方中文入門教程(PHP版) 第一部分:Hello World

RabbitMQ官方中文入門教程(PHP版) 第一部分:Hello World

RabbitMQ是一個訊息代理。它的核心原理非常簡單:接收和傳送訊息。你可以把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、儲存和傳送二進位制的資料——訊息。一般提到RabbitMQ和訊息,都用到一些專有名詞。

  • 生產(Producing)意思就是傳送。傳送訊息的程式就是一個生產者(producer)。我們一般用”P”來表示:
  • 佇列(queue)就是郵箱的名稱。訊息通過你的應用程式和RabbitMQ進行傳輸,它們能夠只儲存在一個佇列(queue)中。 佇列(queue)
    沒有任何限制,你要儲存多少訊息都可以——基本上是一個無限的緩衝。多個生產者(producers)能夠把訊息傳送給同一個佇列,同樣,多個消費者(consumers)也能攻從一個佇列(queue)中獲取資料。佇列可以化城這樣(圖上是佇列的名稱):
  • 消費(Consuming)和獲取訊息是一樣的意思。一個消費者(consumer)就是一個等待獲取訊息的程式。我們把它畫作”C”:

Hello World!

(使用pika 0.9.5 Python客戶端)

我們的“Hello world”不會很複雜——僅僅傳送一個訊息,然後獲取它並輸出到螢幕。這樣以來我們需要兩個程式,一個用作傳送訊息,另一個接受訊息並列印訊息內容

我們大體的設計是這樣的:

生產者(Producer)把訊息傳送到一個名為“hello”的佇列中。消費者(consumer)從這個佇列中獲取訊息。

RabbitMQ庫

RabbitMQ使用的是AMQP協議。要使用她你就必須需要一個使用同樣協議的庫。幾乎所有的程式語言都有可選擇的庫。python也是一樣,可以從以下幾個庫中選擇:

在這一系列教程中,我們打算使用PHPAMQP擴充套件。詳細教程請檢視:

傳送訊息

我們第一個程式send.php會發送一個訊息到佇列中。首先要做的事情就是建立一個到RabbitMQ伺服器的連線。

1 $connection = new AMQPConnection(
array('host' =>'127.0.0.1', 'port' =>'5672', 'vhost' =>'/', 'login' =>'guest', 'password' => 'guest'));

現在我們已經連線上伺服器了,那麼,在傳送訊息之前我們需要確認佇列是存在的。如果我們把訊息傳送到一個不存在的佇列,RabbitMQ會丟棄這條訊息。我門先建立一個名為hello的佇列,然後把訊息傳送到這個佇列中。

1 $queue = new AMQPQueue($channel);
2 $queue->setName($queueName);

這時候我們就可以傳送訊息了,我們第一條訊息只包含了 Hello World!字串,我們打算把它傳送到我們的hello佇列。

在RabbitMQ中,訊息是不能直接傳送到佇列,它需要傳送到交換器(exchange)中。我們不打算在這裡深入討論它——你可以通過教程的第三部分瞭解更多。現在我們所需要了解的是如何使用預設的交換器(exchange),它使用一個空字串來標識。交換器允許我們指定某條訊息需要投遞到哪個佇列,$$routeKey引數必須指定為佇列的名稱:

1 $exchange->publish($message, $routeKey);
2 var_dump("[x] Sent 'Hello World!'");

在退出程式之前,我們需要確認網路緩衝已經被刷寫、訊息已經投遞到RabbitMQ。完成這些事情(正確的關閉連線)是很簡單的。

1 $connection->disconnect();

獲取資料

我們的第二個程式receive.php,將會從佇列中獲取訊息並列印訊息。

這次我們還是先要連線到RabbitMQ伺服器。連線伺服器的程式碼和之前是一樣的。

下一步也和之前一樣,我們需要確認佇列是存在的。使用$queue->declare()建立一個佇列——我們可以執行這個命令很多次,但是隻有一個佇列會建立。

1 $queue = new AMQPQueue($channel);
2 $queue->setName($queueName);
3 $queue->declare();

你也許要問為什麼重複聲明瞭佇列——我們已經在前面的程式碼中聲明瞭它。如果我們確定了佇列是已經存在的,那麼我們可以不這麼做。比如先執行send.php程式。可是我們並不確定哪個程式先執行,這種情況的話再程式中重複宣告是好的做法。

列出所有佇列

你也許希望檢視RabbitMQ由哪些佇列、有多少訊息在佇列中。你可以使用rabbitmqctl工具(使用有許可權的使用者):

1 $ sudo rabbitmqctl list_queues
2 Listing queues ...
3 hello    0
4 ...done.

(omit sudo on Windows)

(在Windows中不需要sudo命令)

從佇列中獲取訊息相對來說稍顯複雜。需要為佇列定義一個回撥(callback)函式。當我們獲取到訊息的時候,Pika庫就會呼叫這個回撥(callback)函式。我們的這個回撥函式將會但因訊息的內容到螢幕上。

1 function callback($envelope, $queue) {
2 $msg = $envelope->getBody();
3 var_dump(" [x] Received:" . $msg);
4 $queue->nack($envelope->getDeliveryTag());
5 }

下一步,我們需要告訴RabbitMQ這個回撥函式將會從hello佇列中接收訊息:

1 $queue->consume('callback');

要成功執行這些命令,我們必須保證佇列是存在的,我們已經能夠保證——我們之前已經使用建立了一個佇列queue_declare。

$queue->nack()函式稍後會介紹。

最後,我們輸入一個無限迴圈來等待訊息資料並確執行回撥函式。

1 var_dump('[*] Waiting for messages. To exit press CTRL+C');
2 while (TRUE) {
3 $queue->consume('callback');
4 }

整合

send.php的全部程式碼:

01 <?php
02
03 /**
04 * PHP amqp(RabbitMQ) Demo-1
05 * @author  yuansir <[email protected]/yuansir-web.com>
06 */
07 $exchangeName = 'demo';
08 $queueName = 'hello';
09 $routeKey = 'hello';
10 $message = 'Hello World!';
11
12 $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
13 $connection->connect() or die("Cannot connect to the broker!\n");
14
15 try {
16 $channel = new AMQPChannel($connection);
17 $exchange = new AMQPExchange($channel);
18 $exchange->setName($exchangeName);
19 $queue = new AMQPQueue($channel);
20 $queue->setName($queueName);
21 $exchange->publish($message, $routeKey);
22 var_dump("[x] Sent 'Hello World!'");
23 } catch (AMQPConnectionException $e) {
24 var_dump($e);
25 exit();
26 }
27 $connection->disconnect();

receive.py的全部程式碼:

01 <?php
02
03 /**
04 * PHP amqp(RabbitMQ) Demo-1
05 * @author  yuansir <[email protected]/yuansir-web.com>
06 */
07 $exchangeName = 'demo';
08 $queueName = 'hello';
09 $routeKey = 'hello';
10
11 $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
12 $connection->connect() or die("Cannot connect to the broker!\n");
13 $channel = new AMQPChannel($connection);
14 $exchange = new AMQPExchange($channel);
15 $exchange->setName($exchangeName);
16 $exchange->setType(AMQP_EX_TYPE_DIRECT);
17 $exchange->declare();
18 $queue = new AMQPQueue($channel);
19 $queue->setName($queueName);
20 $queue->declare();
21 $queue->bind($exchangeName, $routeKey);
22
23 var_dump('[*] Waiting for messages. To exit press CTRL+C');
24 while (TRUE) {
25 $queue->consume('callback');
26 }
27 $connection->disconnect();
28
29 function callback($envelope, $queue) {
30 $msg = $envelope->getBody();
31 var_dump(" [x] Received:" . $msg);
32 $queue->nack($envelope->getDeliveryTag());
33 }

現在就可以在終端中執行我們的程式了。首先,用send.php重續傳送一條訊息:

1 php send.php
2 string(23) "[x] Sent 'Hello World!'"</pre>

生產者(producer)程式send.php每次執行之後就會停止。現在我們就來接收訊息:

1 php receive.php
2 string(46) "[*] Waiting for messages. To exit press CTRL+C"
3 string(26) " [x] Received:Hello World!"</pre>

成功了!我們已經通過RabbitMQ傳送第一條訊息。你也許已經注意到了,receive.py程式並沒有退出。它一直在準備獲取訊息,你可以通過Ctrl-C來終端它。

試下在新的終端中再次執行send.php。

我們已經學會如何傳送訊息到一個已知佇列中並接收訊息。是時候移步到第二部分了,我們將會建立一個簡單的工作佇列(work queue)