1. 程式人生 > >[原始碼分析] 訊息佇列 Kombu 之 mailbox

[原始碼分析] 訊息佇列 Kombu 之 mailbox

# [原始碼分析] 訊息佇列 Kombu 之 mailbox ## 0x00 摘要 本系列我們介紹訊息佇列 Kombu。Kombu 的定位是一個相容 AMQP 協議的訊息佇列抽象。通過本文,大家可以瞭解 Kombu 中的 mailbox 概念,順便可以把之前幾篇文章內容再次梳理下。 ## 0x01 示例程式碼 本文例項程式碼來自 https://liqiang.io/post/celery-source-analysis-remote-manager-control,深表感謝。 示例程式碼分為兩部分˛ Node可以理解為廣播Consumer。Client可以認為是廣播發起者。 ### 1.1 Node ```python import sys import kombu from kombu import pidbox hostname = "localhost" connection = kombu.Connection('redis://localhost:6379') mailbox = pidbox.Mailbox("testMailbox", type="direct") node = mailbox.Node(hostname, state={"a": "b"}) node.channel = connection.channel() def callback(body, message): print(body) print(message) def main(arguments): consumer = node.listen(callback=callback) try: while True: print('Consumer Waiting') connection.drain_events() finally: consumer.cancel() if __name__ == '__main__': sys.exit(main(sys.argv[1:])) ``` ### 1.2 client ```python import sys import kombu from kombu import pidbox def callback(): print("callback") def main(arguments): connection = kombu.Connection('redis://localhost:6379') mailbox = pidbox.Mailbox("testMailbox", type="direct") bound = mailbox(connection) bound._broadcast("print_msg", {'msg': 'Message for you'}) if __name__ == '__main__': sys.exit(main(sys.argv[1:])) ``` ## 0x02 核心思路 廣播功能是利用了Redis的 pubSub 機制完成。 ### 2.1 Redis PubSub 為了支援訊息多播,Redis單獨使用了一個模組來支援訊息多播,也就是PubSub。 Redis作為訊息釋出和訂閱之間的伺服器,起到橋樑的作用,在Redis裡面有一個channel的概念,也就是頻道,釋出者通過指定釋出到某個頻道,只要有訂閱者訂閱了該頻道,該訊息就會發送給訂閱者。 消費者可以啟動多個,PubSub會保證它們收到的都是相同的訊息序列。 ### 2.2 概述 在 Kombu 的 mailbox 實現中,分為 Consumer 和 Producer兩部分。 Consumer 模組,在 Kombu 的 Channel 類中,當註冊 listener 時候,實際就是利用了 redis 驅動的 PubSub功能,把 consumer 註冊訂閱到了一個 key 上。從而 Consumer 的 queue 和 回撥函式 就通過 Channel 與 redis 聯絡起來。這樣後續就可以從 Redis 讀取訊息。 ```python psubscribe, client.py:3542 _subscribe, redis.py:664 _register_LISTEN, redis.py:322 get, redis.py:375 drain_events, base.py:960 drain_events, connection.py:318 main, nod