1. 程式人生 > >RbbitMQ 的 python 實現方法

RbbitMQ 的 python 實現方法

方法 獲得 開始 llb 建立 pic 原因 支持 sts

RbbitMQ(消息隊列)

#簡單模式
服務端
import pika
#連接
connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost))
連接通道
channel = connection.channel()
聲明隊列
channel.queue_declare(queue=hello)
發送數據    
channel.basic_publish(exchange=‘‘,
                      routing_key=hello,
                      body
=Hello World!) print(" [x] Sent ‘Hello World!‘") 結束連接 connection.close() # ########################## 客戶端 ########################## #獲得連接對象 connection = pika.BlockingConnection(pika.ConnectionParameters(host=localhost)) 獲得連接通道 channel = connection.channel() #聲明隊列 channel.queue_declare(queue=
hello) 回調函數 def callback(ch, method, properties, body): print(" [x] Received %r" % body) #從通道取出數據執行回調函數 channel.basic_consume( callback, queue=hello, #隊列名 no_ack=True) print( [*] Waiting for messages. To exit press CTRL+C) 在通道等待數據傳遞過來 channel.start_consuming()
#############################防止掉線客戶端######################################## #no-ack = False,如果消費者遇到情況掛掉了,那麽,RabbitMQ會重新將該任務添加到隊列中。 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag) basic_comsume中的no_ack=False import pika #連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host=10.211.55.4)) 連接通道 channel = connection.channel() 聲明隊列 channel.queue_declare(queue=hello) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ok #執行這行代碼之後,才把數據銷毀 ch.basic_ack(delivery_tag = method.delivery_tag) 獲得管道數據執行回調函數 channel.basic_consume(callback, queue=hello, no_ack=False) print( [*] Waiting for messages. To exit press CTRL+C) 等待 channel.start_consuming() #########################durable :消息不丟失(服務端)########################################3 import pika 連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4)) 連接通道 channel = connection.channel() 聲明隊列 channel.queue_declare(queue=hello, durable=True) push數據 channel.basic_publish(exchange=‘‘, #交換 routing_key=hello, body=Hello World!, 基礎屬性 properties=pika.BasicProperties( delivery_mode=2, # make message persistent #讓消息持久發送 )) print(" [x] Sent ‘Hello World!‘") connection.close() ##################################消息不丟失(客戶端)#############################################)############################################# import pika 連接 connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4)) 通道 channel = connection.channel() 生成隊列 channel.queue_declare(queue=hello, durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ok #確認 ch.basic_ack(delivery_tag = method.delivery_tag) 基礎消耗方法,執行回調 channel.basic_consume(callback, queue=hello, no_ack=False) print( [*] Waiting for messages. To exit press CTRL+C) 開始消耗 channel.start_consuming() ################################## (3) 消息獲取順序 默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。 channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列 ################################客戶端################################## import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=10.211.55.4)) channel = connection.channel() # make message persistent channel.queue_declare(queue=hello) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print ok #確認, ch.basic_ack(delivery_tag = method.delivery_tag) #誰來誰取 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=hello, no_ack=False) print( [*] Waiting for messages. To exit press CTRL+C) channel.start_consuming() #############exchange模型############# exchange type = fanout #交換類型 #############服務端######################## import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) channel = connection.channel() 聲明交流 channel.exchange_declare(exchange=logs, type=fanout) message = .join(sys.argv[1:]) or "info: Hello World!" push數據 channel.basic_publish(exchange=logs, #交流name routing_key=‘‘, body=message) print(" [x] Sent %r" % message) connection.close() ########################客戶端################################################## # 消費者 #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) channel = connection.channel() 聲明交流 channel.exchange_declare(exchange=logs, type=fanout) #訂閱 聲明隊列 result = channel.queue_declare(exclusive=True) #隊列名 queue_name = result.method.queue 與隊列捆綁 channel.queue_bind(exchange=logs, queue=queue_name) print( [*] Waiting for logs. To exit press CTRL+C) def callback(ch, method, properties, body): print(" [x] %r" % body) 基本消耗 channel.basic_consume(callback, queue=queue_name, no_ack=True) #保護數據 #消耗通道 channel.start_consuming() #######################關鍵字發送################################# exchange type = direct 之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。 ###########################客戶端######################################## import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) 連接通道 channel = connection.channel() 聲明交流 channel.exchange_declare(exchange=direct_logs, type=direct) #直接 聲明隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=direct_logs, queue=queue_name, routing_key=severity) 就是這個 print( [*] Waiting for logs. To exit press CTRL+C) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) 消耗,獲取隊列信息回調 channel.basic_consume(callback, queue=queue_name, no_ack=True) 開始消耗 channel.start_consuming() ############################# 在 topic 類型下,可以讓隊列綁定幾個模糊的關鍵字,之後發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。 # 表示可以匹配 0 個 或 多個 單詞 * 表示只能匹配 一個 單詞 ##############################模糊查找############################################ import pika import sys 連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) 管道 channel = connection.channel() 交流 channel.exchange_declare(exchange=topic_logs, type=topic) 話題 聲明隊列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: 與隊列捆綁 channel.queue_bind(exchange=topic_logs, queue=queue_name, routing_key=binding_key) #查找# print( [*] Waiting for logs. To exit press CTRL+C) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() ################################ 基於RabbitMQ的RPC############### 一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那麽客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。 ###################################服務器#### # 建立連接,服務器地址為localhost,可指定ip地址 connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) # 建立會話 channel = connection.channel() # 聲明RPC請求隊列 channel.queue_declare(queue=rpc_queue) # 數據處理方法 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 對RPC請求隊列中的請求進行處理 def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # 調用數據處理方法 response = fib(n) # 將處理結果(響應)發送到回調隊列 ch.basic_publish(exchange=‘‘, routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) # 負載均衡,同一時刻發送給該服務器的請求不超過一個 channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue=rpc_queue) print(" [x] Awaiting RPC requests") channel.start_consuming() ################################################################## import pika import uuid class FibonacciRpcClient(object): def __init__(self): ”“” 客戶端啟動時,創建回調隊列,會開啟會話用於發送RPC請求以及接受響應 “”“ # 建立連接,指定服務器的ip地址 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=localhost)) # 建立一個會話,每個channel代表一個會話任務 self.channel = self.connection.channel() # 聲明回調隊列,再次聲明的原因是,服務器和客戶端可能先後開啟,該聲明是冪等的,多次聲明,但只生效一次 result = self.channel.queue_declare(exclusive=True) # 將次隊列指定為當前客戶端的回調隊列 self.callback_queue = result.method.queue # 客戶端訂閱回調隊列,當回調隊列中有響應時,調用`on_response`方法對響應進行處理; self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 對回調隊列中的響應進行處理的函數 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # 發出RPC請求 def call(self, n): # 初始化 response self.response = None #生成correlation_id self.corr_id = str(uuid.uuid4()) # 發送RPC請求內容到RPC請求隊列`rpc_queue`,同時發送的還有`reply_to`和`correlation_id` self.channel.basic_publish(exchange=‘‘, routing_key=rpc_queue, properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) # 建立客戶端 fibonacci_rpc = FibonacciRpcClient() # 發送RPC請求 print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)

RbbitMQ 的 python 實現方法