1. 程式人生 > >Python—RabbitMQ

Python—RabbitMQ

rabl art 代碼 作者 dir tro zha 8.4 通過

RabbitMQ

  RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統

安裝

  因為RabbitMQ由erlang實現,先安裝erlang

#安裝配置epel源
    rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
#安裝erlang
    yum -y install erlang
#安裝RabbitMQ
    yum -y install rabbitmq-server
#啟動/關閉
service rabbitmq-server start/stop

python使用rabbitmq服務,可以使用現成的類庫pika

#安裝pika
pip install pika    #pip是python的軟件管理包,如果沒有安裝,可以通過apt-get安裝

pika源碼地址https://pypi.python.org/pypi/pika

操作RabbitMQ

  對於RabbitMQ來說,生產和消費不再針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。

生產者

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
#生產者(發)

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘)) #連接rabbitmq服務器
channel = connection.channel()  #生成管道
#聲明queue,消息將在這個隊列中進行傳遞。如果將消息發送到不存在的隊列,rabbitmq將會自動清除這些消息
channel.queue_declare(queue=‘hello‘)#如果加上durable=True,服務器異常時,消息不丟失,持久化

#發送消息到上面聲明的hello隊列
#exchange表示交換器,能精確指定消息應該發送到哪個隊列,routing_key: 設置為隊列的名稱,body: 發送的內容
channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print("Sent ‘Hello World!‘")
connection.close()  #關閉連接
技術分享圖片

消費者

技術分享圖片
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
#消費者(取)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘))
channel = connection.channel()   #生成管道
channel.queue_declare(queue=‘hello‘)

#回調函數
def callback(ch, method, properties, body):
    print("Received %r" % body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)  #無應答 如果是False,在處理完後應答,如果沒應答,說明這次指令沒執行完,下次繼續發布
                                    #可以防止消息丟失
print(‘Waiting..............‘)

#開始接收信息,並進入阻塞狀態,隊列裏有信息才會調用callback進行處理
channel.start_consuming()
技術分享圖片

1、消息確認

  no-ack = False

去除no_ack=True參數或者設置為False,當工作者完成任務後,會反饋給rabbitmq(消息確認)

即使其中一個工作者退出了,正在執行的任務也不會丟失,RabbitMQ會重新將該任務添加到隊列中,分配給其他工作者。

2、消息持久化

  雖然有了消息確認,但是如果rabbitmq自身掛掉的話,那麽任務還是會丟失,所以需要將任務持久化存儲起來。

用delivery_mode=2來標記任務為持久化存儲

技術分享圖片
#聲明持久化存儲
channel.queue_declare(queue=‘hello‘, durable=True)#隊列持久化

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘,
                      #消息持久化
                      properties=pika.BasicProperties(
                          delivery_mode=2, #用delivery_mode=2來標記任務為持久化存儲
                      ))
技術分享圖片

3、消息獲取順序(公平調度)

使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工作者分配多個任務,只有工作者完成任務之後,才會再次接收到任務

技術分享圖片
#聲明持久化存儲
channel.queue_declare(queue=‘hello‘, durable=True)##隊列持久化

#回調函數
def callback(ch, method, properties, body):
    print("Received %r" % body)
    time.sleep(10)
    print(‘ok‘)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)#任務公平調度
技術分享圖片

4、發布訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

發布者

技術分享圖片
#!/usr/bin/env python
import pika
#發布者

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘))
channel = connection.channel() #生成管道

channel.exchange_declare(exchange=‘change_name‘,type=‘fanout‘)#type=‘fanout‘表示可以給多個隊列發數據

message = "Hello World!"

channel.basic_publish(exchange=‘change_name‘,#指定exchange,消息發給exchange,exchange發送給綁定了它的隊列
                      routing_key=‘‘,
                      body=message)
print("Sent %r" % message)
connection.close()
技術分享圖片

訂閱者

技術分享圖片
#!/usr/bin/env python
import pika
#訂閱者
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘))

channel = connection.channel() #生成管道
channel.exchange_declare(exchange=‘change_name‘,type=‘fanout‘)

#生成隨機queue_name
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=‘change_name‘,  #綁定exchange
                   queue=queue_name)
print(‘Waiting........‘)

def callback(ch, method, properties, body):
    print("%r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
技術分享圖片

5、關鍵字發送

  隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

發布者

技術分享圖片
#!/usr/bin/env python
import pika
#發布者

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘))
channel = connection.channel() #生成管道

channel.exchange_declare(exchange=‘direct_name‘,type=‘direct‘)#type=‘direct‘指定關鍵字發送

message = "Hello World!"

channel.basic_publish(exchange=‘direct_name‘,#指定exchange,消息發給exchange,exchange發送給綁定了它的隊列
                      routing_key=‘lisi‘,#指定關鍵字
                      body=message)
print("Sent %r" % message)
connection.close()
技術分享圖片

訂閱者

技術分享圖片
#!/usr/bin/env python
import pika
#訂閱者
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘172.16.8.47‘))
channel = connection.channel() #生成管道
channel.exchange_declare(exchange=‘direct_name‘,type=‘direct‘)#type=‘direct‘指定關鍵字發送

#生成隨機queue_name
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=‘direct_name‘,  #綁定exchange
                   queue=queue_name,
                   routing_key="zhangsan")
channel.queue_bind(exchange=‘direct_name‘,  #綁定exchange
                   queue=queue_name,
                   routing_key="lisi")
print(‘Waiting..........‘)

def callback(ch, method, properties, body):
    print("%r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()
技術分享圖片

更多內容參照:http://www.cnblogs.com/wupeiqi/articles/5132791.html

Python—RabbitMQ