1. 程式人生 > >RabbitMQ遠程執行任務RPC。

RabbitMQ遠程執行任務RPC。

字符 col declare 發送 elf 過程調用 client nbsp form

如果想發一條命令給遠程機器,再把結果返回

這種模式叫RPC:遠程過程調用

發送方將發送的消息放在一個queue裏,由接收方取。

接收方再把執行結果放在另外一個queue裏,由發送方取

實際上,發送方把1,隨機生成的接收queue名 2,UUID值發過去了(用於確認)

客戶端:作用是發送指令

‘‘‘
RPC客戶端代碼
‘‘‘
import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        # 建連接
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
localhost)) # 建頻道 self.channel = self.connection.channel() # 聲明隨機queue,不用了自動刪除 result = self.channel.queue_declare(exclusive=True) # 獲取隨機queue名稱 self.callback_queue = result.method.queue # 準備消費者,從隨機queue中消費信息交給on_response處理 # 聲明了我要收了 self.channel.basic_consume(self.on_response, no_ack
=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): # 自己發的ID和客戶端返回的ID一樣才執行 # 只有這個相等,才說明這個結果是我發過去的命令的結果。Perfect! if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response
= None # 生成的是唯一字符串,每次都不一樣,將這個發給服務器端 self.corr_id = str(uuid.uuid4()) # 發消息到rpc_queue裏,可以看出,發的時候參數裏把回調queue名給服務器了 # 這樣服務器就知道向哪個queue裏發,帶的參數是靈活的queue,服務器計算結果放到該queue self.channel.basic_publish(exchange=‘‘, routing_key=rpc_queue, properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id ), body=str(n)) # 發了消息就該收結果了,但沒有寫start_consuming方法(該方法會阻塞), # 這裏self.connection.process_data_events()相當於非阻塞型的start_consuming # self.response初始化為None while self.response is None: self.connection.process_data_events() # 收到消息就會觸發on_response函數 print("no message...") time.sleep(0.5) # 這裏其實可以不sleep,可以再發個命令。後發的這個可能先執行,不用UUID就不知道哪個是哪個 return int(self.response) fibonacci_rpc = FibonacciRpcClient() print("Requesting fib(30)") response = fibonacci_rpc.call(30) print("response:", response)

服務器端:接收指令並把處理結果返回

‘‘‘
RPC服務器端代碼
‘‘‘
import pika

# 建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(localhost))
# 建頻道
channel = connection.channel()
# 聲明queue,先啟動服務器,客戶端就不用聲明rpc_queue了
channel.queue_declare(queue=rpc_queue)
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
    n = int(body)
    print("fib({0}):".format(n))
    response = fib(n)
    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         # 這個是客戶端發過來的correlation_id,再發回去
                         correlation_id=props.correlation_id
                     ),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=rpc_queue)
print("Awaiting RPC requests")
channel.start_consuming()

運行結果:

‘‘‘
先啟動服務端:
Awaiting RPC requests

再啟動客戶端:
Requesting fib(30)
no message...
no message...
no message...
no message...
response: 832040

再看服務器端:
Awaiting RPC requests
fib(30):

‘‘‘

RabbitMQ遠程執行任務RPC。