1. 程式人生 > >譯-PHP rabbitMQ Tutorial-2

譯-PHP rabbitMQ Tutorial-2

Work Queues (工作/任務佇列)

In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

上次呢,我們寫了個程式從指定的佇列中傳送和接收訊息。這回呢,我們要建立一個任務佇列,用它來把耗時的任務分發到多個“worker”手裡。

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

 工作佇列(也叫任務佇列)的主要是思想是避免立即處理那種資源密集且需要處理很久才能完成的任務。相反呢,我們安排這種任務稍後完成。我們把任務封裝成訊息的形式傳送到一個佇列。一個後臺執行的“worker”程序會取出這些任務,最終完成這個任務。當你執行多個“worker”時,任務便會共享於他們之間。

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

在一次較短的HTTP請求中是不可能處理一個複雜的任務的,這個觀念在web應用中尤其有用。

Preparation(準備!)

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the sleep() function. We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

上回說到,咱們發了個“Hello World"的訊息。 這回傳送一些字串來代表複雜的任務。我們沒有像調整圖片大小或者pdf渲染等現實生活中的任務,那麼我們用sleep()函式來偽裝任務很耗時,進而模擬現實中的場景

We will slightly modify the send.php code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it new_task.php:

記得"send.php"吧,我們稍微修改下下,以便於它可以通過命令傳送任意訊息。 這個程式會向我們的工作佇列中安排任務,

所以就叫它new_task.php吧。

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => 2) # make message persistent
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

Our old receive.php script also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let's call it worker.php:

之前的receive.php也要做點調整:訊息中的每個點號需要被當成一秒來模擬工作耗時。它會從佇列中取出並執行任務,所以就叫它worker.php吧。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Note that our fake task simulates execution time.

注意我們的假定任務需要模擬執行時間。

Run them as in tutorial one: 執行方式跟上回一樣(如下)

shell1$ php new_task.php "A very hard task which takes two seconds.."
shell2$ php worker.php

Round-robin dispatching(迴圈排程/分發)

One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.

輕鬆地工作並行能力是使用任務佇列的優勢之一。要是我們有一堆任務,那就增加更多的worker就好了,很容易衡量。

First, let's try to run two worker.php scripts at the same time. They will both get messages from the queue, but how exactly? Let's see.

首先,我們同事執行倆worker.php。它們都會從佇列中獲得訊息,可究竟為喵呢? 接著看。

You need three consoles open. Two will run the worker.php script. These consoles will be our two consumers - C1 and C2.

開啟三個控制檯。其中倆執行worker.php。這倆控制檯就是我們的兩個消費者,就叫C1和C2吧。

shell1$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
shell2$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C

In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:

第三個呢,我們用來發布任務。要是你已經運行了消費者(們),就釋出一些訊息吧。

shell3$ php new_task.php First message.
shell3$ php new_task.php Second message..
shell3$ php new_task.php Third message...
shell3$ php new_task.php Fourth message....
shell3$ php new_task.php Fifth message.....

Let's see what is delivered to our workers:

瞅瞅都啥發給我們的worker了。

shell1$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ php worker.php
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

預設嘞,RabbitMQ 會按順序傳送每一條訊息到下一個消費者,通常每個消費者會收到相同數量的訊息。這種分發訊息的方式就叫“round-robin”。試試三個或更多的woker.

Message acknowledgment(訊息確認)

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

處理一個任務會要好幾秒。你可能會想要是其中一個消費者執行一個耗時的任務且死在半路上咋整。按我們現在的程式碼來說,

一旦RabbitMQ派送一條資訊到一個消費者,就會從記憶體中立即將它刪除。這種場景下,如果你K掉一個worker的程序,那麼它

正在處理的那條訊息就會丟失。同樣所有派送到這個worker的訊息,在未處理前都會丟失。

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

我們肯定不想丟掉任何任務呀。如果一個worker掛了,我們很希望這個任務被分配到其他的worker頭上。

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

為了保證訊息決不丟失,RabbitMQ支援訊息確認。啥意思呢?就是說消費者傳送一條訊息給RabbitMQ告訴它說,某條特定的

訊息已經收到且處理了,這樣RabbitMQ就可以隨便刪除它啦。

If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

如果消費者沒法送(訊息)確認就掛了,RabbitMQ就會認為訊息沒有被完全處理,然後重新派送它到另一個消費者。那樣

的話你就能保證不會丟失訊息啦,即使worker間歇性死掉都木事。

There aren't any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It's fine even if processing a message takes a very, very long time.

當且僅當worker連線掛掉的時候,RabbitMQ才會重新派送訊息。所以不存在訊息處理超時問題。即使要花好久好久好久好久

....處理一條訊息都木事。

Message acknowledgments are turned off by default. It's time to turn them on by setting the fourth parameter to basic_consume to false (true means no ack) and send a proper acknowledgment from the worker, once we're done with a task.

預設情況下,訊息確認是關閉的。是時候開啟他們了,一旦一項任務完成,通過設定basic_consume 第四個引數設定為false(true意思是關閉訊息確認)來發送適當的確認資訊。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

用這段程式碼我們就能保證神馬都不會丟失,即使你在worker處理訊息的時候用CTRL+C把它K掉。要是worker被幹掉,不久

所有未確認的訊息會被重新派送。

Forgotten acknowledgment(老年失憶)

It's a common mistake to miss the basic_ack. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

丟失basic_ack很常見。是一個很容犯的錯誤,可後果很嚴重!!!

當你的客戶端退出後,訊息會被重新派送(看起來就像隨機重派一樣),RabbitMQ會吃掉你越來越多的記憶體,

因為它會不釋放任何未確認的訊息哇!!!

In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:

要除錯這種錯誤,你可以用rabbitmqctl來列印messages_unacknowledged 欄位。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

Message durability(訊息永續性)

We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.

我們已經學了怎麼保證消費者掛掉的時候任務不丟失。但是如果RabbitMQ服務掛了,我們的任務仍然會丟失。

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

當RabbitMQ退出或者掛掉,它會忘掉其中的佇列和訊息,除非你告訴它表醬紫!!!

要保證訊息不丟失,需要做兩件事:我們需要把佇列和訊息都標記為持久的。

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable. To do so we pass the third parameter to queue_declare as true:
首先,我們需要保證RabbitMQ永遠不會丟失我們的佇列。為了這麼幹,我們得宣告它為持久的。也就是把queue_declare 

的第三個引數設定為true.

$channel->queue_declare('hello', false, true, false, false);

Although this command is correct by itself, it won't work in our present setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for example task_queue:

儘管這麼用是沒錯的,但當前的設定不會起作用。因為我們已經定義了一個叫hello的非持久的佇列。RabbitMQ不允許用不同的引數重新定義一個已存在的佇列,並返回錯誤給任何試圖那麼做的程式。不過這有一個快捷的解決方案-我們可以宣告一個不同名字的佇列,例如task_queue:

$channel->queue_declare('task_queue', false, true, false, false);

This flag set to true needs to be applied to both the producer and consumer code.

生產者和消費者的程式碼都需要將佇列宣告這裡設定為true(第三個引數)。

At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting the delivery_mode = 2 message property which AMQPMessage takes as part of the property array.

到這裡,我們就能保證及時RabbitMQ重啟task_queue這個佇列也不會丟失。現在呢,我們也得把訊息標記為持久的,設定一個數組的屬性delivery_mode=2,作為AMQPMessage 的引數(第二個)就可以啦。

$msg = new AMQPMessage($data,
       array('delivery_mode' => 2) # make message persistent
       );

Note on message persistence(需要注意滴)

Marking messages as persistent doesn't fully guarantee that a message won't be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn't saved it yet. Also, RabbitMQ doesn't do fsync(2) for every message -- it may be just saved to cache and not really written to the disk. The persistence guarantees aren't strong, but it's more than enough for our simple task queue. If you need a stronger guarantee you can wrap the publishing code in a transaction.

把訊息設定為持久的並不能完全保證它不會丟失。儘快它也告訴RabbitMQ把訊息儲存到硬碟上了,但還是有

很小的一種可能性存在——就是RabbitMQ收到訊息但還沒來得及儲存到硬碟,還有,RabbitMQ不是為每條訊息都做fsync操作——很可能只是儲存到快取中而不是真的寫到硬碟。這種永續性的保證並不夠堅實,但是對我們

簡單的任務佇列來講是綽綽有餘了。如果你需要更堅實的永續性保證,可以把傳送程式碼包裝在一個事務當中。

Fair dispatch(公平分配)

You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

你可能發現了這個分發工作並沒按照我麼預想的執行。比如一種情況有兩個worker,當所有奇數的訊息都非常耗時,就算是不耗時,一個worker會一直繁忙,而另一個幾乎不幹活。好喵,RabbitMQ對於這種情況毫無所知,還會繼續均勻的分發訊息。

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

發生這種情況的原因是當資訊進入到佇列RabbitMQ只是進行分發。它不會去看某個消費者未確認的訊息。它只會盲目地分發第N個訊息到第N個消費者。

In order to defeat that we can use the basic_qos method with the prefetch_count = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

為了解決這種情況,我們可以用basic_qus方法,設定prefetch_count=1. 這會告訴RabbitMQ一次只給一個worker一個訊息。

或者,換句話說,在一個worker處理完和確認上一個訊息前,不要給他派送新訊息。相反,RabbitMQ會把訊息派送給下一個

閒置的worker.

$channel->basic_qos(null, 1, null);

Note about queue size(注意啦!!佇列大小)

If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.

如果所有worker都處於繁忙狀態,你的佇列是會被填滿的。要留心一下,要麼增加更多的worker,要麼有一些其他的策略。

Putting it all together(合體!!!again 哈哈)

Final code of our new_task.php file:(new_task.php的終極神碼)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => 2) # make message persistent
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>

And our worker.php: (worker哦)

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

Using message acknowledgments and prefetch you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.

你可以使用訊息確認和預抓取來建立一個工作佇列。永續性設定使得即使RabbitMQ重啟了任務還會存活。

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.

下一扒,我們會學習咋給多個消費者傳送相同的訊息,就像廣播,嗯嗯!