pika和kombu實現rpc程式碼
阿新 • • 發佈:2020-08-11
pika不支援多執行緒的rpc,雖然可以用多程序做到,但程序比執行緒更要耗費資源,而且多程序支援的也並不是很好,會出現偶發的異常。
https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame
根據文章的指引,最快的方式就是放棄pika模組,換成kombu。
示例
文件
Kombu模組的文件地址:https://docs.celeryproject.org/projects/kombu/en/stable/
安裝
pip install kombu
程式碼
服務端任然採用舊的pika模組的程式碼,因為服務端不需要多執行緒去執行命令,單執行緒即可,客戶端程式碼更換為kombu模組。
- 服務端程式碼
#!/usr/bin/python3 # _*_ coding: utf-8 _*_ """ @Software: PyCharm @File: RPCCmd.py @Author: 高留柱 @E-mail: [email protected] @Time: 2020/7/6 15:49 """ import pika import uuid import json class CMD: def __init__(self): """ 初始化函式的時候就建立管道,接收服務端的任務結果 """ credentials = pika.PlainCredentials('qpm', 'cljslrl0620') self.connection = pika.BlockingConnection(pika.ConnectionParameters( "it.sucheon.com", credentials=credentials, virtual_host="qpm")) self.channel = self.connection.channel() # 建立隨機管道,用於告訴服務端,任務的結果放在這個隨機管道中 result = self.channel.queue_declare('', exclusive=True) self.callback_queue = result.method.queue # 從隨機管道中取任務 self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, # 回撥函式 auto_ack=True, ) # 收到任務結果的回撥函式 def on_response(self, ch, method, props, body): # 如果客戶端的隨機字串和服務端傳送過來的隨機字串相等,就代表著該結果屬於該任務 if self.corr_id == props.correlation_id: self.response = body def call(self, cmd, port): """ :param cmd: :return: exchange: 交換器 routing_key: 是管道的名字 reply_to: 告訴服務端執行完命令把結果丟到哪個管道中 """ # TODO: 根據port查詢UUID self.response = None self.corr_id = str(uuid.uuid4()) # 唯一識別符號, 用於標識服務端的結果和客戶端命令之間的聯絡,防止服務端和客戶端命令和結果不對等 self.channel.basic_publish(exchange="", routing_key=str(port), properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(cmd)) # 最多等待10秒,10秒內有值立刻返回 self.connection.process_data_events(time_limit=30) # 檢查佇列中有沒有新訊息,沒加time_limit代表不會阻塞,加了之後會進入阻塞態 if self.response is None: # 如果服務端沒有返回值的話,將刪除任務管道,以免積累訊息,但會導致服務端指令碼停止堵塞態,結束執行 # 如果不刪除任務管道的話,也沒啥大問題,就是當服務端重新連線rabbitMQ的時候,會把之前沒接收到的命令全部執行一遍,但接收結果的管道並不會積壓 self.channel.queue_delete(queue=str(port)) return {"status": 1, "stdout": "", "stderr": "連線超時,請重試!".encode('utf-8')} return json.loads(self.response)
- 客戶端程式碼
pika替換為kombu,實現多執行緒執行命令,且限制命令超時時間為10秒,無響應後終止命令執行。
#!/usr/bin/python3 # _*_ coding: utf-8 _*_ """ @Software: PyCharm @File: rpc_kombu.py @Author: 高留柱 @E-mail: [email protected] @Time: 2020/8/6 15:35 @Notes: """ import subprocess import json import os import time from kombu import Producer, Queue, Connection from kombu.mixins import ConsumerProducerMixin from concurrent.futures import ThreadPoolExecutor # 開啟執行緒池,提供多使用者同時操作 pool = ThreadPoolExecutor(5) def execute_cmd(cmd): timeout = 10 try: p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, preexec_fn=os.setsid, shell=True, close_fds=True) p.wait(timeout) out = p.stdout.read().decode("utf-8").strip("\t\r\n ") err = p.stderr.read().decode("utf-8").strip("\t\r\n ") return json.dumps({'status': int(p.returncode), 'stdout': out, 'stderr': err}) except Exception as e: print(e) return json.dumps({'status': 1, 'stdout': '', 'stderr': str(e)}) finally: try: p.stdout.flush() p.stderr.flush() p.stdout.close() p.stderr.close() p.kill() os.killpg(p.pid, subprocess.signal.SIGKILL) except: pass def getPort(): port = json.loads(execute_cmd("/bin/echo $PORT")).get('stdout') if not port: port = json.loads(execute_cmd('/bin/hostname')).get('stdout').split('-')[-1] return port # 佇列的名字 port = getPort() rpc_queue = Queue(port) class Worker(ConsumerProducerMixin): def __init__(self, connection): self.connection = connection def task(self, message): cmd = message.payload # 獲取訊息內容 result = execute_cmd(cmd) self.producer.publish( body=result, exchange='', routing_key=message.properties['reply_to'], correlation_id=message.properties['correlation_id'], # serializer='json', # 序列化器 retry=False, expiration=3, # 設定訊息到期時間,單位是秒 ) message.ack() def get_consumers(self, Consumer, channel): return [Consumer( queues=[rpc_queue], on_message=self.on_request, accept={'application/json'}, # prefetch_count=1, no_ack=True, # 自動確認訊息,為True自動確認收到訊息,為False或不寫的話,不會自動確認訊息,訊息執行失敗,下次重啟還能收到該訊息 )] def on_request(self, message): try: pool.submit(self.task, message) # t_obj = Thread(target=self.task, args=(message,)) # t_obj.start() except Exception as e: print(e) def Main(): try: with Connection('amqp://qpm:[email protected]/qpm') as conn: Worker(conn).run() except Exception as e: print(e) if __name__ == '__main__': Main()