Rabbitmq中的RPC通訊機制
阿新 • • 發佈:2018-11-01
具體工作機制:
Our RPC will work like this:
- When the Client starts up, it creates an anonymous exclusive callback queue.
- For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
- The request is sent to an rpc_queue queue.
- The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
- The client waits for data on the callback queue. When a message appears, it checks the correlation_id
客戶端程式碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
服務端程式碼:
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
上面的這個程式是官網提供的,乍一看是不是有點懵,我們將程式進行分解:
第一步:普通的釋出者與訂閱者模型
釋出者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
import pika
import os,sys
import time
import uuid
class FibonaciClint(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)
def call(self,message):
print('....開始傳送訊息....==>%s'%message)
self.uuid = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
correlation_id = self.uuid,
))
print('傳送訊息Successful...')
def close(self):
self.conn.close()
if __name__ == '__main__':
fibonaci = FibonaciClint()
message = ''.join(sys.argv[1:]) or 'Hello,RabbitMQ.'
fibonaci.call(message)
訂閱者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
class FibonaciServer(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
def fib(self,n): # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
if n == 0:
return 0
elif n == 1:
return 1
else:
return self.fib(n - 1) + self.fib(n - 2)
def on_request(self,channel,method,properties,body):
print('----------------------------------------')
print('正在消費的訊息:====>%s'%body)
time.sleep(5)
print('訊息的相關屬性為:')
print(properties)
self.channel.basic_ack(delivery_tag=method.delivery_tag)
print('----------------------------------------')
def call_back(self):
self.channel.basic_qos(prefetch_count=2)
self.channel.basic_consume(consumer_callback=self.on_request,
queue='rpc_queue',
no_ack=False)
def start_consume(self):
self.channel.start_consuming()
if __name__ == '__main__':
fibonaci = FibonaciServer()
fibonaci.call_back()
fibonaci.start_consume()
執行資訊:
D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=4e0ec5e9-44c3-45c9-8a5c-686e34bf2785', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=2d82b7e9-7e3a-4495-b40a-dcd334b983ee', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=55ebcbdd-2f90-4c15-80b6-33bd90f05105', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=fd6ff3c2-230c-44ac-9d7d-da8474de5bd6', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=c4ad05cf-a1b4-4aec-a209-5a653b5154af', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=854a8b6c-7c36-457c-bda6-29c53f5646f8', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=433625d0-eed8-4546-ba5b-d41256fcb53c', 'delivery_mode=2'])>
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'Hello,RabbitMQ.'
訊息的相關屬性為:
<BasicProperties(['correlation_id=8bd94f42-2a5c-439c-9082-5883bfdb8481', 'delivery_mode=2'])>
----------------------------------------
第二步:增加部分屬性資訊(回撥佇列的名字作為訊息的屬性資訊傳送)
釋出者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
import pika
import os,sys
import time
import uuid
class FibonaciClint(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)
#TODO 增加回調佇列(回撥佇列的名字作為屬性進行傳送.)
result = self.channel.queue_declare(durable=True,exclusive=False)
self.call_queue = result.method.queue
def call(self,message):
print('....開始傳送訊息....==>%s'%message)
self.uuid = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
correlation_id = self.uuid,
reply_to=self.call_queue, #增加一個屬性.
))
print('傳送訊息Successful...')
def close(self):
self.conn.close()
if __name__ == '__main__':
fibonaci = FibonaciClint()
message = ''.join(sys.argv[1:]) or '3'
fibonaci.call(message)
訂閱者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
class FibonaciServer(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
def fib(self,n): # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
if n == 0:
return 0
elif n == 1:
return 1
else:
return self.fib(n - 1) + self.fib(n - 2)
def on_request(self,channel,method,properties,body):
print('----------------------------------------')
print('正在消費的訊息:====>%s'%body)
time.sleep(5)
print('訊息的相關屬性為:')
print(properties)
value = self.fib(int(body))
print('原值: ',body,'斐波那契的執行結果: ',value)
self.channel.basic_ack(delivery_tag=method.delivery_tag)
print('----------------------------------------')
def call_back(self):
self.channel.basic_qos(prefetch_count=2)
self.channel.basic_consume(consumer_callback=self.on_request,
queue='rpc_queue',
no_ack=False)
def start_consume(self):
self.channel.start_consuming()
if __name__ == '__main__':
fibonaci = FibonaciServer()
fibonaci.call_back()
fibonaci.start_consume()
執行相關資訊:
===========>
D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=b67e4865-0c5b-4d6a-bca0-32a9f0f26357', 'delivery_mode=2', 'reply_to=amq.gen-FaEBXSQ5llJzoK3lisTMJQ'])>
原值: b'3' 斐波那契的執行結果: 2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=db3cd6c4-ab01-43e5-8ede-86e9aa1b3bf9', 'delivery_mode=2', 'reply_to=amq.gen-vffPvlsxRGFwMHcjLAU2kg'])>
原值: b'3' 斐波那契的執行結果: 2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=94b1103e-aa62-43b5-8f63-f6c645e1265d', 'delivery_mode=2', 'reply_to=amq.gen-B4m6KcbtYPrlqOveQtnZaQ'])>
原值: b'3' 斐波那契的執行結果: 2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=efde1f20-765a-4c1b-bf25-d4b8a00e8c97', 'delivery_mode=2', 'reply_to=amq.gen-E5t24uT0lEON92GsZ0y7HA'])>
原值: b'3' 斐波那契的執行結果: 2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'1'
訊息的相關屬性為:
<BasicProperties(['correlation_id=3ed9af6a-6154-4877-ab6a-3b4a97c00c59', 'delivery_mode=2', 'reply_to=amq.gen-LJ5QpG1YwG5kq2XL5RnTTQ'])>
原值: b'1' 斐波那契的執行結果: 1
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'2'
訊息的相關屬性為:
<BasicProperties(['correlation_id=380efb79-8d95-4e32-902f-64c5619f3cc0', 'delivery_mode=2', 'reply_to=amq.gen-rbp6hADtBC7CU0dJ3_iawA'])>
原值: b'2' 斐波那契的執行結果: 1
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=ab363df4-ac91-4643-8cd7-50e977297131', 'delivery_mode=2', 'reply_to=amq.gen-pVsKxifFexmpneFDquTrSw'])>
原值: b'3' 斐波那契的執行結果: 2
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'4'
訊息的相關屬性為:
<BasicProperties(['correlation_id=e6e75d07-7d55-4dcc-aa99-00d7ebea5967', 'delivery_mode=2', 'reply_to=amq.gen-O8kTGUggp2YxIsEs7UQJCQ'])>
原值: b'4' 斐波那契的執行結果: 3
----------------------------------------
----------------------------------------
正在消費的訊息:====>b'5'
訊息的相關屬性為:
<BasicProperties(['correlation_id=d69dca66-71e6-4a2f-a4df-0a64be9a7d5b', 'delivery_mode=2', 'reply_to=amq.gen-qMEQFULAm7S6_WaIgF4tJw'])>
原值: b'5' 斐波那契的執行結果: 5
----------------------------------------
第三步:完整程式
釋出者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
import pika
import os,sys
import time
import uuid
class FibonaciClint(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
self.channel.queue_declare(queue='rpc_queue',exclusive=False,durable=True)
#TODO 增加回調佇列(回撥佇列的名字作為屬性進行傳送.)
result = self.channel.queue_declare(durable=True,exclusive=False)
self.call_queue = result.method.queue
#TODO 增加消費訊息部分程式碼
self.channel.basic_consume(consumer_callback=self.on_response,
queue=self.call_queue,
no_ack=False)
def on_response(self,channel,method,properties,body): #注意:這裡是相應結果
print('RPC 正在獲取RPC響應結果:...')
if self.uuid == properties.correlation_id:
self.response = str(body)
print('相關屬性資訊:')
print(properties)
print('獲取到的執行結果是:%s'%self.response)
self.channel.basic_ack(delivery_tag=method.delivery_tag)
def call(self,message):
self.response = None
self.uuid = str(uuid.uuid4())
print('....開始傳送訊息....==>%s' % message)
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
correlation_id = self.uuid,
reply_to=self.call_queue, #增加一個屬性.
))
print('傳送訊息Successful...')
while self.response is None:
time.sleep(3)
print('正在等待RPC呼叫結果....')
self.conn.process_data_events() #相當於非阻塞消費
return self.response
def close(self):
self.conn.close()
if __name__ == '__main__':
fibonaci = FibonaciClint()
message = ''.join(sys.argv[1:]) or '3'
fibonaci.call(message)
接受者:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import pika
import sys,os
import time
import uuid
class FibonaciServer(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',port=5672))
self.channel = self.conn.channel()
def fib(self,n): # 定義一個主邏輯:斐波那契數列.===>程式的處理邏輯在這裡寫.
if n == 0:
return 0
elif n == 1:
return 1
else:
return self.fib(n - 1) + self.fib(n - 2)
def on_request(self,channel,method,properties,body):
print('----------------------------------------')
print('正在消費的訊息:====>%s'%body)
time.sleep(5)
print('訊息的相關屬性為:')
print(properties)
value = self.fib(int(body))
print('原值: ',body,'斐波那契的執行結果: ',value)
print('將計算的執行結果返回給RPC客戶端....')
self.channel.basic_publish(exchange='',
routing_key=properties.reply_to,
body=str(value),
properties=pika.BasicProperties(
delivery_mode=2,
correlation_id=properties.correlation_id,
))
self.channel.basic_ack(delivery_tag=method.delivery_tag)
print('----------------------------------------')
def call_back(self):
self.channel.basic_qos(prefetch_count=2)
self.channel.basic_consume(consumer_callback=self.on_request,
queue='rpc_queue',
no_ack=False)
def start_consume(self):
self.channel.start_consuming()
if __name__ == '__main__':
fibonaci = FibonaciServer()
fibonaci.call_back()
fibonaci.start_consume()
執行資訊:
===>
D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/producer.py"
....開始傳送訊息....==>3
傳送訊息Successful...
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
正在等待RPC呼叫結果....
RPC 正在獲取RPC響應結果:...
相關屬性資訊:
<BasicProperties(['correlation_id=a86099bf-eff3-4835-989f-a20c2e240188', 'delivery_mode=2'])>
獲取到的執行結果是:b'2'
Process finished with exit code 0
===>
D:\Python34\python.exe "D:/Python Work Location/RabbitMQStudy/test/consumer.py"
----------------------------------------
正在消費的訊息:====>b'3'
訊息的相關屬性為:
<BasicProperties(['correlation_id=a86099bf-eff3-4835-989f-a20c2e240188', 'delivery_mode=2', 'reply_to=amq.gen-T84YRrsaktV_HWQdjqOWgw'])>
原值: b'3' 斐波那契的執行結果: 2
將計算的執行結果返回給RPC客戶端....
----------------------------------------
OK,分解之後是不是容易理解多了.