1. 程式人生 > >python之RabbitMQ訊息佇列

python之RabbitMQ訊息佇列

RabbitMQ:訊息佇列

PY裡的佇列有:執行緒QUEUE、程序QUEUE

程序queue可以用於父程序與子程序進行互動,或者同屬於一父程序下多個子程序進行互動,但如果是兩個獨立的程式,是不能用這個QUEUE進行通訊的

兩個獨立的程式之間,要找一箇中間代理,比如可以用socket通訊,或者用json通訊(放在硬碟上,然後在讀取,速度慢),還有就是RabbitMQ訊息佇列

這個中間代理的好處在於:

1、完全封裝好,不用考慮訊息的處理。

2、可以多個訊息之間建立連線,而不必每兩個程式就要建立socket連線。

中間代理代表是rabbitMQ。

RabbitMQ在python裡的呼叫模組是pika

基本用法如下面:

生產者:

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()  #宣告一個管道

#在管道里,再宣告queue
channel.queue_declare(queue='hello',
                      # durable=True  #durable=True為佇列持續化
                      )

#真正發訊息,用管道發
channel.basic_publish(exchange='',
                      routing_key='hello',  #訊息佇列名稱
                      body='Hello World!',  #訊息內容
                      # properties=pika.BasicProperties(delivery_mode=2,) #訊息持續化
)

print('[x] sent "Hello World!"')
connection.close()

消費者:

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

#這裡再次宣告從那個佇列去收訊息(可以不寫,但必須要有這個佇列,不然會報錯)
channel.queue_declare(queue='hello',
                      # durable=True  #durable=True為佇列持續化
                      )

def callback(ch, method, properties, body):
    print('[x] 收到%r'%body)
    # 給佇列發一個確認執行完的資訊,否則訊息會儲存起來,不會被消費掉,會轉給下個消費者
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)

channel.basic_consume(
    callback,
    queue='hello',
    # no_ack=True   #取消傳送訊息中斷處理功能,不管有沒有處理完,都不會給伺服器端發確認
)

print('[*] Waiting for messages. Toexit press CTRL+C')
channel.start_consuming()

生產消費者模型預設是輪詢處理機制:如果有3個消費者,一個生產者,則是先給第一個啟動的消費者,在給第二個……


回撥函式(callback函式):表示訊息來了就調這個函式。

有的時候傳送訊息,會中斷,那麼RabbitMQ會有處理機制:在回撥函式最後加上ch.basic_ack(delivery_tag=method.delivery_tag),RabbitMQ在函式沒處理完就中斷的話,訊息會儲存起來,不會被消費掉,會轉給下個消費者。取消這個功能程式碼是:no_ack=True,代表不管有沒有處理完,都不會給伺服器端發確認(在伺服器端不關心訊息有沒有處理完的時候用)。

在sbin目錄下ctl是管理佇列的工具。命令列用rabbitmqctl.bat list_queues可以看到佇列和他的訊息有多少個。


如果伺服器突然斷開了,那麼怎麼辦呢。在每次宣告佇列的時候,加上佇列持續化durable=True。和訊息持續化:


怎麼樣能夠根據cpu處理速度調整發多少訊息呢?

在消費者的basic_consume方法前面加上:


怎麼吧一條訊息發給所有人(廣播)?

要用到exchange:

釋出方:要定義exchange,型別是fanout。

channel.exchange_declare(exchange='logs', type='fanout')

完整程式碼如下

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

接受方:直接看完整程式碼

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

#這裡是一樣的定義exchange
channel.exchange_declare(exchange='logs',
                         type='fanout')

#消費者還是要有一個queue,這裡宣告一個隨機queue
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue    #拿到queue

#然後要bind一個exchange,後面就可以開始收了
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 


如果想要選擇性的接收:接收方的type換成direct就行

 

Rpc:發一條訊息到遠端機器去執行,然後吧執行結果返回,這種模式叫rpc(remote procedure call)