python使用rabbitmq例項六,遠端結果返回
前面的例子都有個共同點,就是傳送端傳送訊息出去後沒有結果返回。如果只是單純傳送訊息,當然沒有問題了,但是在實際中,常常會需要接收端將收到的訊息進行處理之後,返回給傳送端。
處理方法描述:傳送端在傳送資訊前,產生一個接收訊息的臨時佇列,該佇列用來接收返回的結果。其實在這裡接收端、傳送端的概念已經比較模糊了,因為傳送端也同樣要接收訊息,接收端同樣也要傳送訊息,所以這裡筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個自然數N傳送給計算節點,計算節點將N值加1後,返回給控制中心。這裡用center.py模擬控制中心,compute.py模擬計算節點。
compute.py程式碼分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#!/usr/bin/env python
#coding=utf8
import
pika
#連線rabbitmq伺服器
connection
= pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost' ))
channel =
connection.channel()
#定義佇列
channel.queue_declare(queue = 'compute_queue' )
print
' [*] Waiting for n'
#將n值加1
def
increase(n):
return
n +
1
#定義接收到訊息的處理方法
def
request(ch, method, properties, body):
print
" [.] increase(%s)"
% (body,)
response
= increase( int (body))
#將計算結果傳送回控制中心
ch.basic_publish(exchange = '',
routing_key = properties.reply_to,
body = str (response))
ch.basic_ack(delivery_tag
= method.delivery_tag)
channel.basic_qos(prefetch_count = 1 )
channel.basic_consume(request, queue = 'compute_queue' )
channel.start_consuming()
|
計算節點的程式碼比較簡單,值得一提的是,原來的接收方法都是直接將訊息打印出來,這邊進行了加一的計算,並將結果傳送回控制中心。
center.py程式碼分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
#!/usr/bin/env python
#coding=utf8
import
pika
class
Center( object ):
def
__init__( self ):
self .connection
= pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost' ))
self .channel
= self .connection.channel()
#定義接收返回訊息的佇列
result
= self .channel.queue_declare(exclusive = True )
self .callback_queue
= result.method.queue
self .channel.basic_consume( self .on_response,
no_ack = True ,
queue = self .callback_queue)
#定義接收到返回訊息的處理方法
def
on_response( self , ch, method, props, body):
self .response
= body
def
request( self , n):
self .response
= None
#傳送計算請求,並宣告返回佇列
self .channel.basic_publish(exchange = '',
routing_key = 'compute_queue' ,
properties = pika.BasicProperties(
reply_to
= self .callback_queue,
),
body = str (n))
#接收返回的資料
while
self .response
is None :
self .connection.process_data_events()
return
int ( self .response)
center
= Center()
print
" [x] Requesting increase(30)"
response
= center.request( 30 )
print
" [.] Got %r" %
(response,)
|
上例程式碼定義了接收返回資料的佇列和處理方法,並且在傳送請求的時候將該佇列賦值給reply_to,在計算節點程式碼中就是通過這個引數來獲取返回佇列的。
開啟兩個終端,一個執行程式碼python compute.py,另外一個終端執行center.py,如果執行成功,應該就能看到效果了。
筆者在測試的時候,出了些小問題,就是在center.py傳送訊息時沒有指明返回佇列,結果compute.py那邊在計算完結果要發回資料時報錯,提示routing_key不存在,再次執行也報錯。用rabbitmqctl list_queues檢視佇列,發現compute_queue佇列有1條資料,每次重新執行compute.py的時候,都會重新處理這條資料。後來使用/etc/init.d/rabbitmq-server restart重新啟動下rabbitmq就ok了。