1. 程式人生 > >Rabbitmq_02 Work Queues

Rabbitmq_02 Work Queues

  上篇部落格講述了最簡單的Rabbitmq使用,一個傳送者對應一個接收者。

  但Rabbitmq沒有規定一個佇列的接受者的數量,意味著可以出現一個傳送者多個接收者的情況。

  就像任務佇列,一個程序生成任務並放到佇列中,多個程序從佇列中讀取任務並執行。

 

傳送者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 注意宣告中的durable引數 channel.queue_declare(queue
='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" # 注意傳送訊息時的第四個引數。 channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode
= 2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()

 

 

接收者

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue
='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") # 開啟ack標誌之後,一定要記住傳送ack ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()

  

分配模式

  如果佇列有多個接收者,那麼訊息如何在接收者之間進行分配?

  預設情況下采用Round-robin模式,意思就是在所有的接收者之間進行輪尋,依次分配。

  比如佇列有兩個接收者a,b;如果向佇列中傳送五條訊息12345,第一條分配到a,第二條分配到b,第三條a,如此類推。最終a接收135, b接收24。

 

  另一中分配模式更注重“公平”,在上面的分配方式中,如果奇數訊息處理時耗時長,偶數訊息耗時短。兩個接收者,則第一個接收者將一直接收繁重的任務。

  此時如果指定channel.basic_qos(prefetch_count=1)引數,則在接收到接收訊息返回ack之前,不會分配給該接收者訊息,往後尋找現在空閒的接收者。

 

訊息確認

  仔細對比接收者的basic_consume方法和上一篇部落格的區別,發現沒有第三個。

  就是說預設情況下,採用訊息確認機制。即訊息被讀取後,並不會立即刪除,而是等待接收者返回ack。確保該訊息被正確接收之後,再刪除。如果Rabbitmq判斷某個接收者失聯,會將之前傳送給該程序且沒有收到返回ack的訊息重新發送給其他程序。

  等待確認沒有超時機制,即如果接收者一直存活,但總不回覆ack,訊息將會一直存在於記憶體中。因此一定要記住,預設模式下,接收者收到訊息之後,呼叫basic_ack(delivery_tag = method.delivery_tag)方法返回ack。

  sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged命令會檢視目前的佇列,以及佇列中已確認和未確認的訊息數。

 

Rabbitmq持久化

  持久化分為兩部分,對佇列持久和對訊息持久。

  channel.queue_declare(queue='task_queue', durable=True)中的durable引數指明瞭佇列持久化,注意接收者中也要進行一樣的宣告。即使Rabbitmq重啟,task_queue佇列依然存在。

  訊息的持久,請看傳送者傳送時的第四個引數,他指明瞭Rabbitmq傳送時,會將訊息同步到硬碟。注意這種同步並不是每次都真的同步到硬碟,只是將訊息放到了系統緩衝,由系統決定何時將緩衝內容刷盤。