1. 程式人生 > >python使用rabbitmq例項六,遠端結果返回

python使用rabbitmq例項六,遠端結果返回

前面的例子都有個共同點,就是傳送端傳送訊息出去後沒有結果返回。如果只是單純傳送訊息,當然沒有問題了,但是在實際中,常常會需要接收端將收到的訊息進行處理之後,返回給傳送端。

處理方法描述:傳送端在傳送資訊前,產生一個接收訊息的臨時佇列,該佇列用來接收返回的結果。其實在這裡接收端、傳送端的概念已經比較模糊了,因為傳送端也同樣要接收訊息,接收端同樣也要傳送訊息,所以這裡筆者使用另外的示例來演示這一過程。

示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N傳送給計算節點,計算節點將N值加1後,返回給控制中心。這裡用center.py模擬控制中心,compute.py模擬計算節點。

compute.py程式碼分析

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #!/usr/bin/env python #coding=utf8 import pika #連線rabbitmq伺服器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定義佇列 channel.queue_declare(queue
='compute_queue') print ' [*] Waiting for n' #將n值加1 def increase(n): return n + 1 #定義接收到訊息的處理方法 def request(ch, method, properties, body): print " [.] increase(%s)"  % (body,) response = increase(int(body)) #將計算結果傳送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str
(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()

計算節點的程式碼比較簡單,值得一提的是,原來的接收方法都是直接將訊息打印出來,這邊進行了加一的計算,並將結果傳送回控制中心。

center.py程式碼分析

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定義接收返回訊息的佇列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定義接收到返回訊息的處理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #傳送計算請求,並宣告返回佇列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的資料 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)

上例程式碼定義了接收返回資料的佇列和處理方法,並且在傳送請求的時候將該佇列賦值給reply_to,在計算節點程式碼中就是通過這個引數來獲取返回佇列的。

開啟兩個終端,一個執行程式碼python compute.py,另外一個終端執行center.py,如果執行成功,應該就能看到效果了。

筆者在測試的時候,出了些小問題,就是在center.py傳送訊息時沒有指明返回佇列,結果compute.py那邊在計算完結果要發回資料時報錯,提示routing_key不存在,再次執行也報錯。用rabbitmqctl list_queues檢視佇列,發現compute_queue佇列有1條資料,每次重新執行compute.py的時候,都會重新處理這條資料。後來使用/etc/init.d/rabbitmq-server restart重新啟動下rabbitmq就ok了。