[原始碼分析] 訊息佇列 Kombu 之 mailbox
阿新 • • 發佈:2021-03-26
# [原始碼分析] 訊息佇列 Kombu 之 mailbox
## 0x00 摘要
本系列我們介紹訊息佇列 Kombu。Kombu 的定位是一個相容 AMQP 協議的訊息佇列抽象。通過本文,大家可以瞭解 Kombu 中的 mailbox 概念,順便可以把之前幾篇文章內容再次梳理下。
## 0x01 示例程式碼
本文例項程式碼來自 https://liqiang.io/post/celery-source-analysis-remote-manager-control,深表感謝。
示例程式碼分為兩部分˛
Node可以理解為廣播Consumer。Client可以認為是廣播發起者。
### 1.1 Node
```python
import sys
import kombu
from kombu import pidbox
hostname = "localhost"
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="direct")
node = mailbox.Node(hostname, state={"a": "b"})
node.channel = connection.channel()
def callback(body, message):
print(body)
print(message)
def main(arguments):
consumer = node.listen(callback=callback)
try:
while True:
print('Consumer Waiting')
connection.drain_events()
finally:
consumer.cancel()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
```
### 1.2 client
```python
import sys
import kombu
from kombu import pidbox
def callback():
print("callback")
def main(arguments):
connection = kombu.Connection('redis://localhost:6379')
mailbox = pidbox.Mailbox("testMailbox", type="direct")
bound = mailbox(connection)
bound._broadcast("print_msg", {'msg': 'Message for you'})
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
```
## 0x02 核心思路
廣播功能是利用了Redis的 pubSub 機制完成。
### 2.1 Redis PubSub
為了支援訊息多播,Redis單獨使用了一個模組來支援訊息多播,也就是PubSub。
Redis作為訊息釋出和訂閱之間的伺服器,起到橋樑的作用,在Redis裡面有一個channel的概念,也就是頻道,釋出者通過指定釋出到某個頻道,只要有訂閱者訂閱了該頻道,該訊息就會發送給訂閱者。
消費者可以啟動多個,PubSub會保證它們收到的都是相同的訊息序列。
### 2.2 概述
在 Kombu 的 mailbox 實現中,分為 Consumer 和 Producer兩部分。
Consumer 模組,在 Kombu 的 Channel 類中,當註冊 listener 時候,實際就是利用了 redis 驅動的 PubSub功能,把 consumer 註冊訂閱到了一個 key 上。從而 Consumer 的 queue 和 回撥函式 就通過 Channel 與 redis 聯絡起來。這樣後續就可以從 Redis 讀取訊息。
```python
psubscribe, client.py:3542
_subscribe, redis.py:664
_register_LISTEN, redis.py:322
get, redis.py:375
drain_events, base.py:960
drain_events, connection.py:318
main, nod