1. 程式人生 > 其它 >RabbitMQ 實現削峰填谷

RabbitMQ 實現削峰填谷

import threading, time
import pika


class SingletonClass(object):
    """單例模式用來少建立連線"""
    # 加鎖,防止併發較高時,同時建立物件,導致建立多個物件
    _singleton_lock = threading.Lock()

    def __init__(self, username='baibing', password='123456', ip='47.111.87.61', port=5672, data={}):
        """__init__在new出來物件後例項化物件"""
        self.credentials = pika.PlainCredentials(username, password)
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=ip, port=port, credentials=self.credentials))
        self.channel = self.connection.channel()
        print('連線成功')

    def __new__(cls):
        """__new__用來建立物件"""
        if not hasattr(SingletonClass, "_instance"):
            with SingletonClass._singleton_lock:
                if not hasattr(SingletonClass, "_instance"):
                    SingletonClass._instance = super().__new__(cls)
        return SingletonClass._instance

    def callback(self, ch, method, properties, body):
        """訂閱者的回撥函式,可以在這裡面做操作,比如釋放庫存等"""
        print("郵箱", body.decode())
        # 在秒殺活動中,這裡來對資料進行平滑的處理
        time.sleep(0.8)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動ack機制,

    def connection_close(self):
        """關閉連線"""
        self.connection.close()

    def consuming_start(self):
        """等待訊息"""
        self.channel.start_consuming()

    def this_publisher(self, email, queue_name='HELLOP'):
        """釋出者
        email:訊息
        queue_name:佇列名稱
        """

        # 1、建立一個名為python-test的交換機 durable=True 代表exchange持久化儲存
        self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
        # self.channel.queue_declare(queue=queue_name)
        # 2、訂閱釋出模式,向名為python-test的交換機中插入使用者郵箱地址email,delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化
        self.channel.basic_publish(exchange='python1',
                                   routing_key='#user#',
                                   body=email,
                                   properties=pika.BasicProperties(delivery_mode=2)
                                   )

        print("佇列{}傳送使用者郵箱{}到MQ成功".format(queue_name, email))
        # 3. 關閉連線
        self.connection_close()

    def this_subscriber(self, queue_name='HELLOP', prefetch_count=10):
        """訂閱者
        queue_name:佇列名稱
        prefetch_count:限制未處理訊息的最大值,ack未開啟時生效
        """
        # 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除
        result = self.channel.queue_declare('', durable=True, exclusive=True)
        # 限制未處理訊息的最大值 這個值就是你資料庫承受的併發量
        self.channel.basic_qos(prefetch_count=5)

        # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存
        self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')

        # 繫結exchange和佇列  exchange 使我們能夠確切地指定訊息應該到哪個佇列去
        self.channel.queue_bind(exchange='python1', queue=result.method.queue, routing_key='#.anonymous.#')

        self.channel.basic_consume(
            result.method.queue,
            self.callback,  # 回撥地址(函式)
            auto_ack=False  # 流量削峰 auto_ack必須為false 手動來ack
        )
        # 等待訊息
        self.consuming_start()


if __name__ == '__main__':
    obj1 = SingletonClass()
    print(id(obj1))
    obj1.this_subscriber()

原文連結:https://blog.csdn.net/qq_42874635/article/details/116268306

先複製過來稍後整理