1. 程式人生 > >Python操作rabbitmq系列(六):進行RPC調用

Python操作rabbitmq系列(六):進行RPC調用

block 異常 遠程 轉換 調用 成了 mage chang 多少

此刻,我們已經進入第6章,是官方的最後一個環節,但是,並非本系列的最後一個環節。因為在實戰中還有一些經驗教訓,並沒體現出來。由於馬上要給同事沒培訓celery了。我也來不及寫太多。等後面,我們再慢慢補充。

RPC:是遠程過程調用。百度寫了一大堆。此刻,我們簡單點說:比如,我們在本地的代碼中調用一個函數,那麽這個函數不一定有返回值,但一定有返回。若是在分布式環境中,香我們前幾章的例子,發送消息出去後,發送端是不清楚客戶端處理完後的結果的。由於rabbitmq的響應機制,頂多能獲取到客戶端的處理狀態,但並不能獲取處理結果。那麽,我們想像本地調用那樣,需要客戶端處理後返回結果該怎麽辦呢。就是如下圖:

技術分享

client發送請求,同時告訴server處理完後要發送消息給:回調隊列的ID:correlation_id=abc,並調用replay_to回調隊列對應的回調函數。請上代碼:

客戶端:

客戶端:發消息也收消息

import pika
import uuid


class FibonacciRpcClient(object):
def __init__(self):
# 創建連接
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))

self.channel = self.connection.channel()

# 創建回調隊列
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue

# 這裏:這個是消息發送方,當要執行回調的時候,它又是接收方
# 使用callback_queue 實現消息接收。即是回調。註意:這裏的回調
# 不需要對消息進行確認。反復確認,沒玩沒了就成了死循環

#這裏設置回調
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)

# 定義回調的響應函數。
# 判斷:若是當前的回調ID和響應的回調ID相同,即表示,是本次請求的回調
# 原因:若是發起上百個請求,發送端總得知道回來的對應的是哪一個發送的
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
# 設置響應和回調通道的ID
self.response = None
self.corr_id = str(uuid.uuid4())
# properties中指定replay_to:表示回調要調用那個函數
# 指定correlation_id:表示回調返回的請求ID是那個
# body:是要交給接收端的參數
self.channel.basic_publish(exchange=‘‘,
routing_key=‘rpc_queue‘,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))

# 監聽回調
while self.response is None:
self.connection.process_data_events()

# 返回的結果是整數,這裏進行強制轉換
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

服務端:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))

channel = connection.channel()

channel.queue_declare(queue=‘rpc_queue‘)


def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):

#收到的消息
n = int(body)

print(" [.] fib(%s)" % n)

#要處理的任務
response = fib(n)

#發布消息。通知到客戶端
ch.basic_publish(exchange=‘‘,
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))

#手動響應
ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

結果:

技術分享

OK,我們的rabbitmq系列,就暫時告一段落。這其中還有一個實際的問題,我們還沒有解決。就是:一個消息到達隊列,我們需要多少個消費端去處理,這些消費端又該如何進行管理,比如:那些消費端工作已經做完,那些有出異常掛掉,隊列除了使用prefetch_count屬性又該如何進一步來平衡各消費端的負載等等。看樣子我們還有很多事要做

Python操作rabbitmq系列(六):進行RPC調用