1. 程式人生 > 實用技巧 >pika和kombu實現rpc程式碼

pika和kombu實現rpc程式碼

pika不支援多執行緒的rpc,雖然可以用多程序做到,但程序比執行緒更要耗費資源,而且多程序支援的也並不是很好,會出現偶發的異常。

https://stackoverflow.com/questions/49154404/pika-threaded-execution-gets-error-505-unexpected-frame

根據文章的指引,最快的方式就是放棄pika模組,換成kombu。

示例

文件
Kombu模組的文件地址:https://docs.celeryproject.org/projects/kombu/en/stable/

安裝

pip install kombu

程式碼
服務端任然採用舊的pika模組的程式碼,因為服務端不需要多執行緒去執行命令,單執行緒即可,客戶端程式碼更換為kombu模組。

  • 服務端程式碼
#!/usr/bin/python3
# _*_ coding: utf-8 _*_

"""
@Software: PyCharm
@File: RPCCmd.py
@Author: 高留柱
@E-mail: [email protected]
@Time: 2020/7/6 15:49
"""
import pika
import uuid
import json


class CMD:
    def __init__(self):
        """
        初始化函式的時候就建立管道,接收服務端的任務結果
        """
        credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    "it.sucheon.com", credentials=credentials, virtual_host="qpm"))

        self.channel = self.connection.channel()

        # 建立隨機管道,用於告訴服務端,任務的結果放在這個隨機管道中
        result = self.channel.queue_declare('', exclusive=True)

        self.callback_queue = result.method.queue

        # 從隨機管道中取任務
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,  # 回撥函式
            auto_ack=True,
        )

    # 收到任務結果的回撥函式
    def on_response(self, ch, method, props, body):
        # 如果客戶端的隨機字串和服務端傳送過來的隨機字串相等,就代表著該結果屬於該任務
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, cmd, port):
        """
        :param cmd:
        :return:
        exchange: 交換器
        routing_key: 是管道的名字
        reply_to: 告訴服務端執行完命令把結果丟到哪個管道中
        """
        # TODO: 根據port查詢UUID
        self.response = None
        self.corr_id = str(uuid.uuid4())  # 唯一識別符號, 用於標識服務端的結果和客戶端命令之間的聯絡,防止服務端和客戶端命令和結果不對等
        self.channel.basic_publish(exchange="",
                                   routing_key=str(port),
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                   ),
                                   body=str(cmd))

        # 最多等待10秒,10秒內有值立刻返回
        self.connection.process_data_events(time_limit=30)  # 檢查佇列中有沒有新訊息,沒加time_limit代表不會阻塞,加了之後會進入阻塞態
        if self.response is None:
            # 如果服務端沒有返回值的話,將刪除任務管道,以免積累訊息,但會導致服務端指令碼停止堵塞態,結束執行
            # 如果不刪除任務管道的話,也沒啥大問題,就是當服務端重新連線rabbitMQ的時候,會把之前沒接收到的命令全部執行一遍,但接收結果的管道並不會積壓
            self.channel.queue_delete(queue=str(port))
            return {"status": 1, "stdout": "", "stderr": "連線超時,請重試!".encode('utf-8')}
        return json.loads(self.response)
  • 客戶端程式碼
    pika替換為kombu,實現多執行緒執行命令,且限制命令超時時間為10秒,無響應後終止命令執行。
#!/usr/bin/python3
# _*_ coding: utf-8 _*_

"""
@Software: PyCharm
@File: rpc_kombu.py
@Author: 高留柱
@E-mail: [email protected]
@Time: 2020/8/6 15:35
@Notes:
"""

import subprocess
import json
import os
import time
from kombu import Producer, Queue, Connection
from kombu.mixins import ConsumerProducerMixin
from concurrent.futures import ThreadPoolExecutor

# 開啟執行緒池,提供多使用者同時操作
pool = ThreadPoolExecutor(5)


def execute_cmd(cmd):
    timeout = 10
    try:
        p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, preexec_fn=os.setsid, shell=True,
                             close_fds=True)
        p.wait(timeout)
        out = p.stdout.read().decode("utf-8").strip("\t\r\n ")
        err = p.stderr.read().decode("utf-8").strip("\t\r\n ")
        return json.dumps({'status': int(p.returncode), 'stdout': out, 'stderr': err})
    except Exception as e:
        print(e)
        return json.dumps({'status': 1, 'stdout': '', 'stderr': str(e)})
    finally:
        try:
            p.stdout.flush()
            p.stderr.flush()
            p.stdout.close()
            p.stderr.close()
            p.kill()
            os.killpg(p.pid, subprocess.signal.SIGKILL)
        except:
            pass

def getPort():
    port = json.loads(execute_cmd("/bin/echo $PORT")).get('stdout')
    if not port:
        port = json.loads(execute_cmd('/bin/hostname')).get('stdout').split('-')[-1]
    return port


# 佇列的名字
port = getPort()
rpc_queue = Queue(port)


class Worker(ConsumerProducerMixin):

    def __init__(self, connection):
        self.connection = connection

    def task(self, message):
        cmd = message.payload  # 獲取訊息內容
        result = execute_cmd(cmd)
        self.producer.publish(
            body=result,
            exchange='',
            routing_key=message.properties['reply_to'],
            correlation_id=message.properties['correlation_id'],
            # serializer='json',  # 序列化器
            retry=False,
            expiration=3,  # 設定訊息到期時間,單位是秒
        )
        message.ack()

    def get_consumers(self, Consumer, channel):
        return [Consumer(
            queues=[rpc_queue],
            on_message=self.on_request,
            accept={'application/json'},
            # prefetch_count=1,
            no_ack=True,  # 自動確認訊息,為True自動確認收到訊息,為False或不寫的話,不會自動確認訊息,訊息執行失敗,下次重啟還能收到該訊息
        )]

    def on_request(self, message):
        try:
            pool.submit(self.task, message)
            # t_obj = Thread(target=self.task, args=(message,))
            # t_obj.start()
        except Exception as e:
            print(e)


def Main():
    try:
        with Connection('amqp://qpm:[email protected]/qpm') as conn:
            Worker(conn).run()
    except Exception as e:
        print(e)


if __name__ == '__main__':
    Main()