RabbitMQ 實現削峰填谷
阿新 • • 發佈:2021-08-10
import threading, time import pika class SingletonClass(object): """單例模式用來少建立連線""" # 加鎖,防止併發較高時,同時建立物件,導致建立多個物件 _singleton_lock = threading.Lock() def __init__(self, username='baibing', password='123456', ip='47.111.87.61', port=5672, data={}): """__init__在new出來物件後例項化物件""" self.credentials = pika.PlainCredentials(username, password) self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=ip, port=port, credentials=self.credentials)) self.channel = self.connection.channel() print('連線成功') def __new__(cls): """__new__用來建立物件""" if not hasattr(SingletonClass, "_instance"): with SingletonClass._singleton_lock: if not hasattr(SingletonClass, "_instance"): SingletonClass._instance = super().__new__(cls) return SingletonClass._instance def callback(self, ch, method, properties, body): """訂閱者的回撥函式,可以在這裡面做操作,比如釋放庫存等""" print("郵箱", body.decode()) # 在秒殺活動中,這裡來對資料進行平滑的處理 time.sleep(0.8) ch.basic_ack(delivery_tag=method.delivery_tag) # 手動ack機制, def connection_close(self): """關閉連線""" self.connection.close() def consuming_start(self): """等待訊息""" self.channel.start_consuming() def this_publisher(self, email, queue_name='HELLOP'): """釋出者 email:訊息 queue_name:佇列名稱 """ # 1、建立一個名為python-test的交換機 durable=True 代表exchange持久化儲存 self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic') # self.channel.queue_declare(queue=queue_name) # 2、訂閱釋出模式,向名為python-test的交換機中插入使用者郵箱地址email,delivery_mode = 2 宣告訊息在佇列中持久化,delivery_mod = 1 訊息非持久化 self.channel.basic_publish(exchange='python1', routing_key='#user#', body=email, properties=pika.BasicProperties(delivery_mode=2) ) print("佇列{}傳送使用者郵箱{}到MQ成功".format(queue_name, email)) # 3. 關閉連線 self.connection_close() def this_subscriber(self, queue_name='HELLOP', prefetch_count=10): """訂閱者 queue_name:佇列名稱 prefetch_count:限制未處理訊息的最大值,ack未開啟時生效 """ # 建立臨時佇列,佇列名傳空字元,consumer關閉後,佇列自動刪除 result = self.channel.queue_declare('', durable=True, exclusive=True) # 限制未處理訊息的最大值 這個值就是你資料庫承受的併發量 self.channel.basic_qos(prefetch_count=5) # 宣告exchange,由exchange指定訊息在哪個佇列傳遞,如不存在,則建立。durable = True 代表exchange持久化儲存,False 非持久化儲存 self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic') # 繫結exchange和佇列 exchange 使我們能夠確切地指定訊息應該到哪個佇列去 self.channel.queue_bind(exchange='python1', queue=result.method.queue, routing_key='#.anonymous.#') self.channel.basic_consume( result.method.queue, self.callback, # 回撥地址(函式) auto_ack=False # 流量削峰 auto_ack必須為false 手動來ack ) # 等待訊息 self.consuming_start() if __name__ == '__main__': obj1 = SingletonClass() print(id(obj1)) obj1.this_subscriber()
原文連結:https://blog.csdn.net/qq_42874635/article/details/116268306
先複製過來稍後整理