[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)
阿新 • • 發佈:2021-03-30
# [原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)
[toc]
## 0x00 摘要
Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。Celery 是呼叫其Worker 元件來完成具體任務處理。
```shell
$ celery --app=proj worker -l INFO
$ celery -A proj worker -l INFO -Q hipri,lopri
$ celery -A proj worker --concurrency=4
$ celery -A proj worker --concurrency=1000 -P eventlet
$ celery worker --autoscale=10,0
```
所以我們本文就來講解 worker 的啟動過程。
## 0x01 Celery的架構
前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。
[[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html)
[[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html)
[[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html)
以及
[原始碼解析 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562308.html)
[[原始碼解析] 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562310.html)
下面我們再回顧下 Celery 的結構。Celery的架構圖如下所示:
```python
+-----------+ +--------------+
| Producer | | Celery Beat |
+-------+---+ +----+---------+
| |
| |
v v
+-------------------------+
| Broker |
+------------+------------+
|
|
|
+-------------------------------+
| | |
v v v
+----+-----+ +----+------+ +-----+----+
| Exchange | | Exchange | | Exchange |
+----+-----+ +----+------+ +----+-----+
| | |
v v v
+-----+ +-------+ +-------+
|queue| | queue | | queue |
+--+--+ +---+---+ +---+---+
| | |
| | |
v v v
+---------+ +--------+ +----------+
| worker | | Worker | | Worker |
+-----+---+ +---+----+ +----+-----+
| | |
| | |
+-----------------------------+
|
|
v
+---+-----+
| backend |
+---------+
```
## 0x02 示例程式碼
其實網上難以找到除錯Celery worker的辦法。我們可以去其原始碼看看,發現如下:
```python
# def test_worker_main(self):
# from celery.bin import worker as worker_bin
#
# class worker(worker_bin.worker):
#
# def execute_from_commandline(self, argv):
# return argv
#
# prev, worker_bin.worker = worker_bin.worker, worker
# try:
# ret = self.app.worker_main(argv=['--version'])
# assert ret == ['--version']
# finally:
# worker_bin.worker = prev
```
所以我們可以模仿來進行,使用如下啟動worker,進行除錯。
```python
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task()
def add(x, y):
return x + y
if __name__ == '__main__':
app.worker_main(argv=['worker'])
```
## 0x03 邏輯概述
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。
這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,並且提供併發引數的時候,會將一些worker放在這裡面。
celery預設的併發方式是prefork,也就是多程序的方式,這裡只是celery對multiprocessing pool進行了輕量的改造,然後給了一個新的名字叫做prefork,這個pool與多程序的程序池的區別就是這個task_pool只是存放一些執行的worker。
consumer也就是消費者,主要是從broker那裡接受一些message,然後將message轉化為`celery.worker.request.Request` 的一個例項。
Celery 在適當的時候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函式所生成的類,所以可以在自定義的任務函式中使用這個請求引數,獲取一些關鍵的資訊。此時,已經瞭解了task_pool和consumer。
接下來,這個worker具有兩套資料結構,這兩套資料結構是並行執行的,他們分別是 'ET時刻表' 、就緒佇列。
就緒佇列:那些 立刻就需要執行的task, 這些task到達worker的時候會被放到這個就緒佇列中等待consumer執行。
我們下面看看如何啟動Celery。
## 0x04 Celery應用
程式首先會來到Celery類,這是Celery的應用。
可以看到主要就是:各種類名稱,TLS, 初始化之後的各種signal。
位置在:celery/app/base.py,其定義如下:
```python
class Celery:
"""Celery application."""
amqp_cls = 'celery.app.amqp:AMQP'
backend_cls = None
events_cls = 'celery.app.events:Events'
loader_cls = None
log_cls = 'celery.app.log:Logging'
control_cls = 'celery.app.control:Control'
task_cls = 'celery.app.task:Task'
registry_cls = 'celery.app.registry:TaskRegistry'
#: Thread local storage.
_local = None
_fixups = None
_pool = None
_conf = None
_after_fork_registered = False
#: Signal sent when app is loading configuration.
on_configure = None
#: Signal sent after app has prepared the configuration.
on_after_configure = None
#: Signal sent after app has been finalized.
on_after_finalize = None
#: Signal sent by every new process after fork.
on_after_fork = None
```
對於我們的示例程式碼,入口是:
```python
def worker_main(self, argv=None):
if argv is None:
argv = sys.argv
if 'worker' not in argv:
raise ValueError(
"The worker sub-command must be specified in argv.\n"
"Use app.start() to programmatically start other commands."
)
self.start(argv=argv)
```
### 4.1 新增子command
celery/bin/celery.py 會進行新增 子command,我們可以看出來。
這些 Commnd 是可以在命令列作為子命令直接使用的 。
```python
celery.add_command(purge)
celery.add_command(call)
celery.add_command(beat)
celery.add_command(list_)
celery.add_command(result)
celery.add_command(migrate)
celery.add_command(status)
celery.add_command(worker)
celery.add_command(events)
celery.add_command(inspect)
celery.add_command(control)
celery.add_command(graph)
celery.add_command(upgrade)
celery.add_command(logtool)
celery.add_command(amqp)
celery.add_command(shell)
celery.add_command(multi)
```
每一個都是command。我們以worker為例,具體如下:
```python
worker = {CeleryDaemonC