python之RabbitMQ
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
RabbitMQ安裝
1.linux
安裝配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release -6-8.noarch.rpm
安裝erlang
$ yum -y install erlang
安裝RabbitMQ
$ yum -y install rabbitmq-server
service rabbitmq-server start/stop
2.安裝python API
pip install pika
or
easy_install pika
先來一個基於Queue實現生產者消費者模型試試水
#!/usr/bin/env python3
#coding:utf8
import queue
import threading
message = queue.Queue(10)
def producer (i):
‘‘‘廚師,生產包子放入隊列‘‘‘
while True:
message.put(i)
def consumer(i):
‘‘‘消費者,從隊列中取包子吃‘‘‘
while True:
msg = message.get()
for i in range(12): 廚師的線程包子
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10): 消費者的線程吃包子
t = threading.Thread(target=consumer, args=(i,))
t.start()
開始rabbitMQ
對於RabbitMQ來說,生產和消費不再針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。
一、最基本的生產者消費者
1.生產者代碼
#!/usr/bin/env python
import pika
# ######################### 生產者 #########################
#鏈接rabbit服務器(localhost是本機,如果是其他服務器請修改為ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建一個隊列名叫hello
channel.queue_declare(queue=‘hello‘)
#exchange -- 它使我們能夠確切地指定消息應該到哪個隊列去。
#向隊列插入數值 routing_key是隊列名 body是要插入的內容
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print("開始隊列")
#緩沖區已經flush而且消息已經確認發送到了RabbitMQ中,關閉鏈接
connection.close()
2.消費者代碼
#!/usr/bin/env python
import pika
# ########################## 消費者 ##########################
#鏈接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#如果生產者沒有運行創建隊列,那麽消費者也許就找不到隊列了。為了避免這個問題
#所有消費者也創建這個隊列
channel.queue_declare(queue=‘hello‘)
#接收消息需要使用callback這個函數來接收,他會被pika庫來調用
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
#從隊列取數據 callback是回調函數 如果拿到數據 那麽將執行callback函數
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)
print(‘ [*] 等待信息. To exit press CTRL+C‘)
#永遠循環等待數據處理和callback處理的數據
channel.start_consuming()
二、acknowledgment 消息不丟失的方法
no-ack = False,如果生產者遇到情況(關閉通道,連接關閉或TCP連接丟失))掛掉了,那麽,RabbitMQ會重新將該任務添加到隊列中。
1.生產者不變,但是還是復制上來吧
#!/usr/bin/env python
import pika
# ######################### 生產者 #########################
#鏈接rabbit服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建一個隊列名叫hello
channel.queue_declare(queue=‘hello‘)
#向隊列插入數值 routing_key是隊列名 body是要插入的內容
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print("開始隊列")
connection.close()
2.消費者
import pika
#鏈接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#如果生產者沒有運行創建隊列,那麽消費者創建隊列
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print ‘ok‘
ch.basic_ack(delivery_tag = method.delivery_tag) #主要使用此代碼
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=False)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
當生產者生成一條數據,被消費者接收,消費者中斷後如果不超過10秒,連接的時候數據還在。當超過10秒之後,重新鏈接,數據將消失。消費者等待鏈接。
三、durable 消息不丟失 (消息持久化)
這個 queue_declare 需要在 生產者(producer) 和消費方(consumer) 代碼中都進行設置。
1.生產者
#!/usr/bin/env python
import pika
#鏈接rabbit服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建隊列,使用durable方法
channel.queue_declare(queue=‘hello‘, durable=True)
#如果想讓隊列實現持久化那麽加上durable=True
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘,
properties=pika.BasicProperties(
delivery_mode=2,
#標記我們的消息為持久化的 - 通過設置 delivery_mode 屬性為 2
#這樣必須設置,讓消息實現持久化
))
#這個exchange參數就是這個exchange的名字. 空字符串標識默認的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的隊列中。
print(" [x] 開始隊列‘")
connection.close()
2.消費者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建隊列,使用durable方法
channel.queue_declare(queue=‘hello‘, durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print ‘ok‘
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=False)
print(‘ [*] 等待隊列. To exit press CTRL+C‘)
channel.start_consuming()
註:標記消息為持久化的並不能完全保證消息不會丟失,盡管告訴RabbitMQ保存消息到磁盤,當RabbitMQ接收到消息還沒有保存的時候仍然有一個短暫的時間窗口. RabbitMQ不會對每個消息都執行同步fsync(2) --- 可能只是保存到緩存cache還沒有寫入到磁盤中,這個持久化保證不是很強,但這比我們簡單的任務queue要好很多,如果你想很強的保證你可以使用 publisher confirms
四、消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列
1.生產者
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
# 設置隊列為持久化的隊列
channel.queue_declare(queue=‘task_queue‘, durable=True)
message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
routing_key=‘task_queue‘,
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 設置消息為持久化的
))
print(" [x] Sent %r" % message)
connection.close()
2.消費者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘durable=True) # 設置隊列持久化
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print ‘ok‘
ch.basic_ack(delivery_tag = method.delivery_tag)
#表示誰來誰取,不再按照奇偶數排列
channel.basic_qos(prefetch_count=1)# 消息未處理完前不要發送信息的消息
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=False)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
交換 (Exchanges)
exchange類型可用: direct , topic , headers 和 fanout 。 我們將要對最後一種進行講解 --- fanout
一、消息發布訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
exchange type = fanout
1.發布者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘logs‘,
type=‘fanout‘)
message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=‘logs‘,
routing_key=‘‘,
body=message)
print(" [x] Sent %r" % message)
connection.close()
2.訂閱者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘logs‘,
type=‘fanout‘)
result = channel.queue_declare(exclusive=True) #隊列斷開後自動刪除臨時隊列
queue_name = result.method.queue # 隊列名采用服務端分配的臨時隊列
channel.queue_bind(exchange=‘logs‘,
queue=queue_name)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
六、關鍵字發送
exchange type = direct
之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
1.生產者:
#!/usr/bin/env python3
#coding:utf8
#######################生產者#################
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘direct_logs‘,
type=‘direct‘)
severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
2.消費者:
#!/usr/bin/env python3
#coding:utf8
import pika
import sys
############消費者####
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘direct_logs‘,
type=‘direct‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange=‘direct_logs‘,
queue=queue_name,
routing_key=severity)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
七、模糊匹配
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之後發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
# 表示可以匹配 0 個 或 多個 單詞
-
表示只能匹配 一個 單詞
發送者路由值 隊列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
1.消費者
#!/usr/bin/env python3
#coding:utf8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘topic_logs‘,
type=‘topic‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange=‘topic_logs‘,
queue=queue_name,
routing_key=binding_key)
print(‘ [*] Waiting for logs. To exit press CTRL+C‘)
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
2.生產者
#!/usr/bin/env python3
#coding:utf8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘topic_logs‘,
type=‘topic‘)
routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
更多內容:以下參考:
http://blog.csdn.net/songfreeman/article/details/50945025
work queue (用來在多個workers之間分發消息)
1.循環調度(Round-robin dispatching)
使用多個消費者來接收並處理消息
默認,RabbitMQ將循環的發送每個消息到下一個Consumer , 平均每個Consumer都會收到同樣數量的消息。 這種分發消息的方式成為 循環調度(round-robin)
-
生產者:
#!/usr/bin/env python3 #coding:utf8 import pika import sys #鏈接 connec = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connec.channel() #創建隊列 channel.queue_declare(queue=‘worker‘) #插入數據 message = ‘ ‘.join(sys.argv[1:]) or "Hello World" channel.basic_publish(exchange=‘‘, routing_key=‘worker‘, body=message, properties=pika.BasicProperties(delivery_mode = 2,) ) print(" [x] Send %r " % message)
-
消費者:
#!/usr/bin/env python3 #coding:utf8 import time import pika connect = pika.BlockingConnection(pika.ConnectionParameters (host=‘localhost‘)) channel = connect.channel() channel.queue_declare(‘worker‘) def callback(ch, method, properties,body): print(" [x] Received %r" % body) time.sleep(body.count(b‘.‘)) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=‘worker‘, ) channel.start_consuming()
執行的時候兩個消費者等待接收消息,
第一次生產者產生消息的時候被消費者1接收
第二次生產者產生消息的時候被消費者2接收
python之RabbitMQ