python之RabbitMQ訊息佇列
RabbitMQ:訊息佇列
PY裡的佇列有:執行緒QUEUE、程序QUEUE
程序queue可以用於父程序與子程序進行互動,或者同屬於一父程序下多個子程序進行互動,但如果是兩個獨立的程式,是不能用這個QUEUE進行通訊的。
兩個獨立的程式之間,要找一箇中間代理,比如可以用socket通訊,或者用json通訊(放在硬碟上,然後在讀取,速度慢),還有就是RabbitMQ訊息佇列。
這個中間代理的好處在於:
1、完全封裝好,不用考慮訊息的處理。
2、可以多個訊息之間建立連線,而不必每兩個程式就要建立socket連線。
中間代理代表是rabbitMQ。
RabbitMQ在python裡的呼叫模組是pika
基本用法如下面:
生產者:
import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() #宣告一個管道 #在管道里,再宣告queue channel.queue_declare(queue='hello', # durable=True #durable=True為佇列持續化 ) #真正發訊息,用管道發 channel.basic_publish(exchange='', routing_key='hello', #訊息佇列名稱 body='Hello World!', #訊息內容 # properties=pika.BasicProperties(delivery_mode=2,) #訊息持續化 ) print('[x] sent "Hello World!"') connection.close()
消費者:
import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() #這裡再次宣告從那個佇列去收訊息(可以不寫,但必須要有這個佇列,不然會報錯) channel.queue_declare(queue='hello', # durable=True #durable=True為佇列持續化 ) def callback(ch, method, properties, body): print('[x] 收到%r'%body) # 給佇列發一個確認執行完的資訊,否則訊息會儲存起來,不會被消費掉,會轉給下個消費者 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue='hello', # no_ack=True #取消傳送訊息中斷處理功能,不管有沒有處理完,都不會給伺服器端發確認 ) print('[*] Waiting for messages. Toexit press CTRL+C') channel.start_consuming()
生產消費者模型預設是輪詢處理機制:如果有3個消費者,一個生產者,則是先給第一個啟動的消費者,在給第二個……
回撥函式(callback函式):表示訊息來了就調這個函式。
有的時候傳送訊息,會中斷,那麼RabbitMQ會有處理機制:在回撥函式最後加上ch.basic_ack(delivery_tag=method.delivery_tag),RabbitMQ在函式沒處理完就中斷的話,訊息會儲存起來,不會被消費掉,會轉給下個消費者。取消這個功能程式碼是:no_ack=True,代表不管有沒有處理完,都不會給伺服器端發確認(在伺服器端不關心訊息有沒有處理完的時候用)。
在sbin目錄下ctl是管理佇列的工具。命令列用rabbitmqctl.bat list_queues可以看到佇列和他的訊息有多少個。
如果伺服器突然斷開了,那麼怎麼辦呢。在每次宣告佇列的時候,加上佇列持續化durable=True。和訊息持續化:
怎麼樣能夠根據cpu處理速度調整發多少訊息呢?
在消費者的basic_consume方法前面加上:
怎麼吧一條訊息發給所有人(廣播)?
要用到exchange:
釋出方:要定義exchange,型別是fanout。
channel.exchange_declare(exchange='logs', type='fanout')
完整程式碼如下
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
接受方:直接看完整程式碼
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
#這裡是一樣的定義exchange
channel.exchange_declare(exchange='logs',
type='fanout')
#消費者還是要有一個queue,這裡宣告一個隨機queue
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue #拿到queue
#然後要bind一個exchange,後面就可以開始收了
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
如果想要選擇性的接收:接收方的type換成direct就行
Rpc:發一條訊息到遠端機器去執行,然後吧執行結果返回,這種模式叫rpc(remote procedure call)