Python—RabbitMQ
RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統
安裝
因為RabbitMQ由erlang實現,先安裝erlang
#安裝配置epel源 rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm #安裝erlang yum -y install erlang
#安裝RabbitMQ yum -y install rabbitmq-server
#啟動/關閉 service rabbitmq-server start/stop
python使用rabbitmq服務,可以使用現成的類庫pika
#安裝pika pip install pika #pip是python的軟件管理包,如果沒有安裝,可以通過apt-get安裝
pika源碼地址https:
/
/
pypi.python.org
/
pypi
/
pika
操作RabbitMQ
對於RabbitMQ來說,生產和消費不再針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
生產者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika #生產者(發) connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) #連接rabbitmq服務器 channel = connection.channel() #生成管道 #聲明queue,消息將在這個隊列中進行傳遞。如果將消息發送到不存在的隊列,rabbitmq將會自動清除這些消息 channel.queue_declare(queue=‘hello‘)#如果加上durable=True,服務器異常時,消息不丟失,持久化 #發送消息到上面聲明的hello隊列 #exchange表示交換器,能精確指定消息應該發送到哪個隊列,routing_key: 設置為隊列的名稱,body: 發送的內容 channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘) print("Sent ‘Hello World!‘") connection.close() #關閉連接
消費者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika #消費者(取) connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) channel = connection.channel() #生成管道 channel.queue_declare(queue=‘hello‘) #回調函數 def callback(ch, method, properties, body): print("Received %r" % body) channel.basic_consume(callback, queue=‘hello‘, no_ack=True) #無應答 如果是False,在處理完後應答,如果沒應答,說明這次指令沒執行完,下次繼續發布 #可以防止消息丟失 print(‘Waiting..............‘) #開始接收信息,並進入阻塞狀態,隊列裏有信息才會調用callback進行處理 channel.start_consuming()
1、消息確認
no-ack = False
去除no_ack=True參數或者設置為False,當工作者完成任務後,會反饋給rabbitmq(消息確認)
即使其中一個工作者退出了,正在執行的任務也不會丟失,RabbitMQ會重新將該任務添加到隊列中,分配給其他工作者。
2、消息持久化
雖然有了消息確認,但是如果rabbitmq自身掛掉的話,那麽任務還是會丟失,所以需要將任務持久化存儲起來。
用delivery_mode=2來標記任務為持久化存儲
#聲明持久化存儲 channel.queue_declare(queue=‘hello‘, durable=True)#隊列持久化 channel.basic_publish(exchange=‘‘, routing_key=‘hello‘, body=‘Hello World!‘, #消息持久化 properties=pika.BasicProperties( delivery_mode=2, #用delivery_mode=2來標記任務為持久化存儲 ))
3、消息獲取順序(公平調度)
使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,只有工作者完成任務之後,才會再次接收到任務
#聲明持久化存儲 channel.queue_declare(queue=‘hello‘, durable=True)##隊列持久化 #回調函數 def callback(ch, method, properties, body): print("Received %r" % body) time.sleep(10) print(‘ok‘) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1)#任務公平調度
4、發布訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
發布者
#!/usr/bin/env python import pika #發布者 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) channel = connection.channel() #生成管道 channel.exchange_declare(exchange=‘change_name‘,type=‘fanout‘)#type=‘fanout‘表示可以給多個隊列發數據 message = "Hello World!" channel.basic_publish(exchange=‘change_name‘,#指定exchange,消息發給exchange,exchange發送給綁定了它的隊列 routing_key=‘‘, body=message) print("Sent %r" % message) connection.close()
訂閱者
#!/usr/bin/env python import pika #訂閱者 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) channel = connection.channel() #生成管道 channel.exchange_declare(exchange=‘change_name‘,type=‘fanout‘) #生成隨機queue_name result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘change_name‘, #綁定exchange queue=queue_name) print(‘Waiting........‘) def callback(ch, method, properties, body): print("%r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
5、關鍵字發送
隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
發布者
#!/usr/bin/env python import pika #發布者 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) channel = connection.channel() #生成管道 channel.exchange_declare(exchange=‘direct_name‘,type=‘direct‘)#type=‘direct‘指定關鍵字發送 message = "Hello World!" channel.basic_publish(exchange=‘direct_name‘,#指定exchange,消息發給exchange,exchange發送給綁定了它的隊列 routing_key=‘lisi‘,#指定關鍵字 body=message) print("Sent %r" % message) connection.close()
訂閱者
#!/usr/bin/env python import pika #訂閱者 connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) channel = connection.channel() #生成管道 channel.exchange_declare(exchange=‘direct_name‘,type=‘direct‘)#type=‘direct‘指定關鍵字發送 #生成隨機queue_name result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘direct_name‘, #綁定exchange queue=queue_name, routing_key="zhangsan") channel.queue_bind(exchange=‘direct_name‘, #綁定exchange queue=queue_name, routing_key="lisi") print(‘Waiting..........‘) def callback(ch, method, properties, body): print("%r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
更多內容參照:http://www.cnblogs.com/wupeiqi/articles/5132791.html
Python—RabbitMQ