[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (2)
阿新 • • 發佈:2021-04-02
# [原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (2)
[toc]
## 0x00 摘要
Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。Celery 是呼叫其Worker 元件來完成具體任務處理。
前文講了 Celery 啟動過程的前半部分,本文繼續後半部分的分析。
本文相關其他連結如下:
[[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html)
[[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html)
[[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html)
以及
[原始碼解析 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562308.html)
[[原始碼解析] 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562310.html)
[[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)](https://www.cnblogs.com/rossiXYZ/p/14563763.html)
## 0x01 前文回顧
前文提到了,我們經過一系列過程,正式來到了 Worker 的邏輯。我們在本文將接下來繼續看後續 work as a program 的啟動過程。
```python
+----------------------+
+----------+ | @cached_property |
| User | | Worker |
+----+-----+ +---> | |
| | | |
| worker_main | | Worker application |
| | | celery/app/base.py |
v | +----------------------+
+---------+------------+ |
| Celery | |
| | |
| Celery application | |
| celery/app/base.py | |
| | |
+---------+------------+ |
| |
| celery.main |
| |
v |
+---------+------------+ |
| @click.pass_context | |
| celery | |
| | |
| | |
| CeleryCommand | |
| celery/bin/celery.py | |
| | |
+---------+------------+ |
| |
| |
| |
v |
+----------+------------+ |
| @click.pass_context | |
| worker | |
| | |
| | |
| WorkerCommand | |
| celery/bin/worker.py | |
+-----------+-----------+ |
| |
+-----------------+
```
為了便於大家理解,我們先給出最終的流程圖如下:
![](https://img2020.cnblogs.com/blog/1850883/202103/1850883-20210321200854832-1138498285.png)
## 0x2 Worker as a program
這裡的 worker 其實就是 業務主體,值得大書特書。
程式碼來到了celery/apps/worker.py。
```python
class Worker(WorkController):
"""Worker as a program."""
```
例項化的過程呼叫到了WorkController基類的init。
初始化基本就是:
- loader 載入各種配置;
- setup_defaults做預設設定;
- setup_instance 就是正式建立,包括配置存放訊息的queue。
- 通過Blueprint來建立 Worker 內部的各個子模組。
程式碼位於celery/apps/worker.py。
```python
class WorkController:
"""Unmanaged worker instance."""
app = None
pidlock = None
blueprint = None
pool = None
semaphore = None
#: contains the exit code if a :exc:`SystemExit` event is handled.
exitcode = None
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
def __init__(self, app=None, hostname=None, **kwargs):
self.app = app or self.app # 設定app屬性
self.hostname = default_nodename(hostname) # 生成node的hostname
self.startup_time = datetime.utcnow()
self.app.loader.init_worker() # 呼叫app.loader的init_worker()方法
self.on_before_init(**kwargs) # 呼叫該初始化方法
self.setup_defaults(**kwargs) # 設定預設值
self.on_after_init(**kwargs)
self.setup_instance(**self.prepare_args(**kwargs)) # 建立例項
```
此時會呼叫app.loader的init_worker方法,
### 2.1 loader
此處的app.loader,是在Celery初始化的時候設定的loader屬性,該值預設是celery.loaders.app:AppLoader。其作用就是匯入各種配置 。
其位於celery/loaders/base.py,定義如下:
```python
@cached_property
def loader(self):
"""Current loader instance."""
return get_loader_cls(self.loader_cls)(app=self)
```
get_loader_cls如下:
```python
def get_loader_cls(loader):
"""Get loader class by name/alias."""
return symbol_by_name(loader, LOADER_ALIASES, imp=import_from_cwd)
```
此時的loader例項就是AppLoader,然後呼叫該類的init_worker方法,
```python
def init_worker(self):
if not self.worker_initialized: # 如果該類沒有被設定過
self.worker_initialized = True # 設定成設定過
self.import_default_modules() # 匯入預設的modules
self.on_worker_init()
```
import_default_modules如下,主要就是匯入在app配置檔案中需要匯入的modules,
```python
def import_default_modules(self):
responses = signals.import_modules.send(sender=self.app)
# Prior to this point loggers are not yet set up properly, need to
# check responses manually and reraised exceptions if any, otherwise
# they'll be silenced, making it incredibly difficult to debug.
for _, response in responses: # 匯入專案中需要匯入的modules
if isinstance(response, Exception):
raise response
return [self.import_task_module(m) for m in self.default_modules]
```
### 2.2 setup_defaults in worker
繼續分析Worker類初始化過程中的self.setup_defaults方法,給執行中需要設定的引數設定值 ,
這之後,self.pool_cls的數