[原始碼分析] 訊息佇列 Kombu 之 啟動過程
阿新 • • 發佈:2021-03-04
# [原始碼分析] 訊息佇列 Kombu 之 啟動過程
## 0x00 摘要
本系列我們介紹訊息佇列 Kombu。Kombu 的定位是一個相容 AMQP 協議的訊息佇列抽象。通過本文,大家可以瞭解 Kombu 是如何啟動,以及如何搭建一個基本的架子。
因為之前有一個綜述,所以大家會發現,一些概念講解文字會同時出現在後續文章和綜述之中。
## 0x01 示例
下面使用如下程式碼來進行說明。
本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。
```python
def main(arguments):
hub = Hub()
exchange = Exchange('asynt_exchange')
queue = Queue('asynt_queue', exchange, 'asynt_routing_key')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print(' kombu ')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(3, p_message)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
```
## 0x02 啟動
讓我們順著程式流程看看Kombu都做了些什麼,也可以對 Kombu 內部有所瞭解。
本文關注的重點是:Connection,Channel 和 Hub 是如何聯絡在一起的。
### 2.1 Hub
在程式開始,我們建立了Hub。
Hub的作用是建立訊息Loop,但是此時尚未建立,因此只是一個靜態例項。
```python
hub = Hub()
```
其定義如下:
```python
class Hub:
"""Event loop object.
Arguments:
timer (kombu.asynchronous.Timer): Specify custom timer instance.
"""
def __init__(self, timer=None):
self.timer = timer if timer is not None else Timer()
self.readers = {}
self.writers = {}
self.on_tick = set()
self.on_close = set()
self._ready = set()
self._running = False
self._loop = None
self.consolidate = set()
self.consolidate_callback = None
self.propagate_errors = ()
self._create_poller()
```
因為此時沒有建立loop,所以目前重要的步驟是建立Poll,其Stack如下:
```python
_get_poller, eventio.py:321
poll, eventio.py:328
_create_poller, hub.py:113
__init__, hub.py:96
main, testU