rabbitMQ 生產及消費程式碼示例
阿新 • • 發佈:2020-10-22
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # durable:server掛了佇列仍然存在 channel.queue_declare(queue='task_queue', durable=True) # 使用預設的交換機發送訊息。exchange為空就使用預設的。delivery_mode=2:使訊息持久化。和佇列名稱繫結routing_key message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, )) print(" [x] Sent %r" % message) connection.close()
消費端:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") # 手動對訊息進行確認 ch.basic_ack(delivery_tag=method.delivery_tag) # basic_consume:這個函式有no_ack引數。該引數預設為false。表示:需要對message進行確認。怎麼理解:no設定成false,表示要確認 channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
說明:
個人覺得rabbitMQ比較好的一點是能夠對消費的資訊進行反饋,如果消費端的程式執行失敗了,還可以重複消費.