[原始碼解析] 並行分散式框架 Celery 之架構 (2)
阿新 • • 發佈:2021-03-27
# [原始碼解析] 並行分散式框架 Celery 之架構 (2)
[toc]
## 0x00 摘要
Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。
本系列將通過原始碼分析,和大家一起深入學習 Celery。本文是系列第二篇,繼續探究 Celery 架構。
## 0x01 上文回顧
前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。
[[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html)
[[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html)
[[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html)
[[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html)
上文 [[原始碼解析] 並行分散式框架 Celery 之架構 (1)](https://www.cnblogs.com/rossiXYZ/p/14562308.html) 中,我們大致介紹了 Celery 的概念,用途和架構,現在回憶 Celery 的架構圖如下:
```python
+-----------+ +--------------+
| Producer | | Celery Beat |
+-------+---+ +----+---------+
| |
| |
v v
+-------------------------+
| Broker |
+------------+------------+
|
|
|
+-------------------------------+
| | |
v v v
+----+-----+ +----+------+ +-----+----+
| Exchange | | Exchange | | Exchange |
+----+-----+ +----+------+ +----+-----+
| | |
v v v
+-----+ +-------+ +-------+
|queue| | queue | | queue |
+--+--+ +---+---+ +---+---+
| | |
| | |
v v v
+---------+ +--------+ +----------+
| worker | | Worker | | Worker |
+-----+---+ +---+----+ +----+-----+
| | |
| | |
+-----------------------------+
|
|
v
+---+-----+
| backend |
+---------+
```
下面我們從幾個方面繼續分析。
## 0x02 worker的思考
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。
### 2.1 worker的模式
首先,我們思考下worker 的工作模式,即,這些併發的 worker 彼此之間是什麼關係?是否需要一個master 來統一調控?為了更好的對比,我們先看看nginx的實現。
#### 2.1.1 Nginx模式
nginx 後臺程序包含一個master程序和多個worker程序。
- master程序主要用來管理worker程序,包含:接收來自外界的訊號,向各worker程序傳送訊號,監控worker程序的執行狀態,當worker程序退出後(異常情況下),會自動重新啟動新的worker程序。
- worker程序則處理基本的網路事件。多個worker程序之間是對等的,他們同等競爭來自客戶端的請求,各程序互相之間是獨立的。一個請求 只可能在一個worker程序中處理,一個worker程序,不可能處理其它程序的請求。
worker程序之間是平等的,每個程序處理請求的機會也是一樣的。一個連線請求過來,每個程序都有可能處理這個連線,怎麼做到的呢?
- 首先,每個worker程序都是從master程序fork過來,在master程序裡面,先建立好需要listen的socket(listenfd)之後,然後再fork出多個worker程序。
- 其次,所有worker程序的listenfd會在新連線到來時變得可讀,為保證只有一個程序處理該連線,所有worker程序在註冊listenfd讀事件前搶accept_mutex,搶到互斥鎖的那個程序註冊listenfd讀事件,在讀事件裡呼叫accept接受該連線。
- 最後,當一個worker程序在accept這個連線之後,就開始讀取請求,解析請求,處理請求,產生資料後,再返回給客戶端,最後才斷開連線,這樣一個完整的請求就是這樣的了。
我們可以看到,一個請求完全由worker程序來處理,而且只在一個worker程序中處理。
#### 2.1.2 Celery 模式
##### 2.1.2.1 模式
與 Nginx不同,在 Celery 之中,沒有 master 程序。所有的都是worker 程序。大家都在 redis 之上等待新任務。
但是,每一個worker內部,父程序和子程序內部,卻又是 master - slave 模式,也就是我們常說的主從。
- master(就是父程序)負責任務的獲取,分發,slaves 的管理(建立,增加,關閉,重啟,丟棄等等),其他輔助模組的維護等等。
- slave(就是子程序)負責消費從排程器傳遞過來的任務。
worker內部 具體流程如下:
- 排程器首先**預生成**(prefork)一些工作程序,做為一個**程序池**(mutiprocessing-pool),之後通過**事件驅動**(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。
- 如果master監聽到就執行對應的回撥,源源不斷的從 **中間人**(broker)那裡提取任務,並通過 **管道**(pipe)作為程序間通訊的方式,運用一系列的路由策略(round-robin、weight 等等)交給slave。
- slave工作程序 消費(ack)任務,再通過管道向排程器進行狀態同步(sync),程序間通訊等等行為。
- 這個 workloop 其實很明顯,就是監聽讀管道的資料(主程序從這個管道的另一端寫),然後執行對應的回撥,期間會呼叫 put 方法,往寫管道同步狀態(主程序可以從管道的另一端讀這個資料)。
具體如下圖:
```python
+-------------+
| |
| Redis |
| |
+---+------+--+
^ ^
| |
| |
+-----------------------+ +--------------+
| |
| |
| |
+------------------+--------------------+ +-----------+--------+
| Worker 1 | | Worker n |
| | | |
| | | |
| Parent process | | Parent process |
| + | | + |
| | | | | |
| | | | | |
| +--------+------------+ | | | |
| | | | | | |
| | | | | | |
| v v | | v |
| subprocess 1 ... subprocess n | ... | subprocess |
| | | |
+---------------------------------------+ +--------------------+
```
##### 2.1.2.2 互動
在 Celery 中,採用的是分散式的管理方式,每個節點之間都是通過廣播/單播進行通訊,從而達到協同效果。
在處理具體控制管理工作時候,worker 程序之間有交流,具體分為兩種:
- 啟動時候使用 Mingle 模組來互相交換資訊。
- 執行狀態下,通過 gossip 協議進行狀態的共享。但是這個狀態共享對於任務的分配和worker 的排程沒有必然的聯絡,只是用來監控和響應控制檯訊息。因為假如有若干 worker,面對一個控制檯訊息,應該只有一個 worker 來響應其訊息,所以就利用 gossip 協議選舉出一個 leader,這個 leader 進行響應。
在處理具體業務工作時候,worker 之間沒有交流。
當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel,一個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。
以 redis 為例,底層 Kombu 事實上是使用 redis 的 BRPOP 功能來完成對具體 queue 中訊息的讀取。
如果多個 worker 同時去使用 brpop 獲取 broker 訊息,那麼具體哪一個能夠讀取到訊息,其實這就是有一個 競爭機制,因為redis 的單程序處理,所以只能有一個 worker 才能讀到。
### 2.2 worker 組成
在 worker 文件中提到:worker主要由四部分組成的:task_pool, consumer, scheduler, mediator。
這四部分依賴下面兩組資料結構工作。
- 就緒佇列:那些 立刻就需要執行的task, 這些task到達worker的時候會被放到這個就緒佇列中等待consumer執行。
- ETA:是那些有ETA引數,或是rate_limit引數的 task。這些 task 被放入 timer 佇列中,timer 負責在條件合適的情況下,把這些 task 放入執行pool。
但是實際上,mediator 在程式碼中沒有發現。也許是 mediator 成了預設功能而非元件。
#### 2.2.1 task_pool
task_pool主要是用來存放的是一些worker。當啟動了一個worker,並且提供併發引數的時候,會將一些worker放在這裡面。
celery預設的併發方式是prefork,也就是多程序的方式,這裡只是celery對`multiprocessing.Pool`進行了輕量的改造,然後給了一個新的名字叫做prefork。
這個pool與多程序的程序池的區別就是這個task_pool只是存放一些執行的worker。
#### 2.2.2 consumer
consumer也就是消費者, 主要是從broker那裡接受一些message。然後將message轉化為`celery.worker.request.Request` 的一個例項。並且在適當的時候,會把這個請求包裝進Task中。
Task就是用裝飾器 `app_celery.task()` 裝飾的函式所生成的類,所以可以在自定義的任務函式中使用這個請求引數,獲取一些關鍵的資訊。
#### 2.2.3 Scheduler
對於 Scheduler,可以從 Beat 和 Timer 兩個方面講述。
##### 2.2.3.1 Beat
Celery Beat:任務排程器,Beat程序會讀取配置檔案的內容,週期性地將配置中到期需要執行的任務傳送給任務佇列。
其中樞部分就是 **Scheduler**,**Service** 是驅動部分,最後的承載實體就是 **SchedulerEntry**。
其內部主要資料結構是一個最小堆,它的作用就是承載了所有我們設定得定時任務,而最小堆的特性就是堆頂的元素是最小的,排序的依據是時間差值。celery 會先計算每個定時任務下一次執行的時間戳 - 當前時間戳,然後根據這個時間差值進行排序,毫無疑問,差值最小的就是下一次需要執行的任務。
在 Service 的 start 函式中,會呼叫 scheduler.tick(),從而在內部最小堆中獲取下次一需要執行的任務。將 `SchedulerEntry` 轉換為 `Task`,傳送到 redis 的佇列中。
具體定義如下:
```python
class Scheduler:
"""Scheduler for periodic tasks.
"""
Entry = ScheduleEntry
#: The schedule dict/shelve.
schedule = None
#: Maximum time to sleep between re-checking the schedule.
max_interval = DEFAULT_MAX_INTERVAL
#: How often to sync the schedule (3 minutes by default)
sync_every = 3 * 60
#: How many tasks can be called before a sync is forced.
sync_every_tasks = None
_last_sync = None
_tasks_since_sync = 0
```
**持久化**
在 Celery 中,定時任務的執行並不會因為我們重啟了 Celery 而失效,反而在重啟 Celery 之後,Celery 會根據上一次關閉之前的執行狀態,重新計算新的執行週期,而這裡計算的前提就是能夠獲取舊的執行資訊,而在 Scheduler 中,這些資訊都是預設儲存在檔案中的。
Celery 預設的儲存是通過 Python 預設的 shelve 庫實現的,shelve 是一個類似於字典物件的資料庫,我們可以通過呼叫 `sync` 命令在磁碟和記憶體中同步資料。
##### 2.2.3.2 Timer
文件中對於 Timer 的描述如下:
> The timer schedules internal functions, like cleanup and internal monitoring,
> but also it schedules ETA tasks and rate limited tasks.
> If the scheduled tasks ETA has passed it is moved to the execution pool.
可以看到,其功能就是排程內部的函式,比如清理和監控,也排程ETA tasks and rate limited tasks。
對於清理,有比如 backend.process_cleanup 和 loader.on_process_cleanup。
### 2.3 初始化過程
worker初始化過程中,各個模組的執行順序是由一個BluePrint類定義,並且根據各個模組之間的依賴進行排序執行。
*Worker* 的 *start* 方法中,其實就是執行了一個 *self.blueprint* 的 *start* 方法,這裡面的 blueprint,是 celery 自己實現的一個 **有向無環圖**(DAG)的資料結構,其功能簡單描述下就是:根據命令列傳入的不同引數,初始化不同的元件(step),並執行這些元件的初始化方法。其實就是一個對流程控制的面向物件的封裝。
每個 Step 的具體的功能如下:
- Timer:用於執行定時任務的 Timer;
- Hub:Eventloop 的封裝物件;
- Pool:構造各種執行池(執行緒/程序/協程);
- Autoscaler:用於自動增長或者 pool 中工作單元;
- StateDB:持久化 worker 重啟區間的資料(只是重啟);
- Autoreloader:用於自動載入修改過的程式碼;
- Beat:建立 Beat 程序,不過是以子程序的形式執行(不同於命令列中以 beat 引數執行);
## 0x03 Consumer的思考
Celery 使用 Consumer 來從 broker 獲取訊息。
### 3.1 元件
Consumer 的元件如下:
- 【1】Connection:管理和 broker 的 Connection 連線
- 【3】Events:用於傳送監控事件
- 【2】Agent:`cell` actor
- 【2】Mingle:不同 worker 之間同步狀態用的
- 【1】Tasks:啟動訊息 Consumer
- 【3】Gossip:消費來自其他 worker 的事件
- 【1】Heart:傳送心跳事件(consumer 的心跳)
- 【3】Control:遠端命令管理服務
在參考文章 [1: Worker 啟動流程概述](https://liqiang.io/post/celery-source-analysis-worker-start-flow?lang=ZH_CN) 中提到:
> 這裡我對所有的 Bootstep 都做了標號處理,標號的大小說明了這些服務對於我們程式碼閱讀的重要程式,1 最重要,3 最不緊要。對於 Consumer 來說,
>
> 1 是基本功能,這些功能組成了一個簡單的非強壯的訊息佇列框架;
>
> 2 一般重要,可以實現一個高階一點的功能;
>
> 3 屬於附加功能,同時也屬於一點分散式的功能。
### 3.2 作用
因此,我們可以看到,celery Consumer 元件的概念遠遠要大於Kombu的Consumer,不只是從broker取得訊息,也包括訊息的消費,分發,監控,心跳等一系列功能。
可以說,除了訊息迴圈引擎 被 hub 承擔,多程序被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔 。
## 0x04 高效能的思考
celery 的高效能主要靠兩個方面來保證,**一個是多程序**,**一個是事件驅動**。此外在一些具體功能實現方面也確保了高效能的實現。
### 4.1 多程序
多程序可以良好的發揮每個核的計算能力。可以在一定程度上提升程式的併發能力,緩解 IO 的壓力。
Celery 的方案叫做 prefork,也就是預生成。預生成指的是,主程序在執行具體的業務邏輯之前,先提前 fork 出來一堆子程序,並把他們存起來集中管理,形成一個程序池。平常的時候這些子程序都是 休眠(asleep) 狀態,只有當主程序派發任務的時候,會喚醒(awake)其中的一個子程序,並通過程序間通訊的手段,向子程序傳輸相應的任務資料。
如前所述,每一個worker內部,父程序和子程序內部,是 master - slave 模式。
- master(就是父程序)負責任務的獲取,分發,slaves 的管理(建立,增加,關閉,重啟,丟棄等等),其他輔助模組的維護等等。
- 排程器首先**預生成**(prefork)一些工作程序(slave),做為一個**程序池**(mutiprocessing-pool),之後通過**事件驅動**(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。
- slave(就是子程序)負責消費從排程器傳遞過來的任務。再通過管道向排程器進行狀態同步(sync),程序間通訊等等行為。
### 4.2 事件驅動
Kombu內部使用了事件驅動 。
Master 排程器是一個事件驅動模型,什麼是事件驅動,其實就是它消滅了阻塞。
正常的單執行緒模型,一次只能拿一條訊息,每一次都要走一條來和回的鏈路,並且需要一個 while True 的迴圈不斷的去檢測,這樣無疑是非常低效且開銷大的。
而事件驅動則不這樣,他可以同時傳送多個檢測的訊號,然後就直接掛起,等待核心進行提示,有提示再去執行對應的回撥。這樣既優雅的化解了單執行緒每次都要檢測的 while True,又通過多次請求併發降低了重複鏈路。
以 epoll 為例:
- epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些。
- epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表 就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了 這些檔案描述符在系統呼叫時複製的開銷。
- 另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描 述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥 機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。
### 4.3 Task的實現
Task 承載的功能就是在 Celery 應用中,啟動對應的訊息消費 者。
關於 Task 的實現,這就涉及問題:究竟是分發程式碼還是分發資料?
因為 Celery 是一個通用性功能,不是特定面對大資料,所以分發資料是不可避免的。
剩下問題就是是否需要分發程式碼?
Task 任務最基本的形式就是函式,任務釋出最直接的想法就是client將要執行的相關函式程式碼打包,釋出到broker。分散式計算框架spark就是使用這種方式。
#### 4.3.1 分發程式碼
業界分發程式碼的代表是 Spark。Spark的思想比較簡單:挪計算不挪資料。那怎麼去描述這個計算呢?Spark 通過RDD封裝一個針對資料對應關係記錄,在這個封裝之上來記錄計算。這就涉及到兩個最重要的問題:
- 如何拆分計算邏輯;
- 如何分發計算邏輯;
於是 Spark 把所有的計算邏輯劃分為這兩種型別:
- 能夠分發到各個節點上並行執行的;
- 需要經過一定量的結果合併之後才能繼續執行的;
然後把**一個巨大的問題拆分成相對獨立的子問題分發到各個機器上求解。**
在實際提交時候,Spark把計算程式碼提交到每個工作節點上然後進行計算。
#### 4.3.2 Celery 模式
2.0之前的celery也支援這種任務釋出的方式。這種方式顯而易見的一個壞處是傳遞給broker的資料量可能會比較大。解決的辦法也很容易想到,就是把要釋出的任務相關的程式碼,提前告訴worker。
這就是 全域性集合 和 註解註冊的作用。
```python
@app.task(name='hello_task')
def hello():
print('hello')
```
其中的app是worker中的application,通過裝飾器的方式,對任務函式註冊。
app會維護一個字典,key是任務的名字,也就是這裡的`hello_task`,value是這個函式的記憶體地址。任務名必須唯一,但是任務名這個引數不是必須的,如果沒有給這個引數,celery會自動根據包的路徑和函式名生成一個任務名。
通過上面這種方式,client釋出任務只需要提供任務名以及相關引數,不必提供任務相關程式碼:
```python
# client端
app.send_task('hello_task')
```
這裡需要注意:client釋出任務後,任務會以一個訊息的形式寫入broker佇列,帶有任務名稱等相關引數,等待worker獲取。這裡任務的釋出,是完全獨立於worker端的,即使worker沒有啟動,訊息也會被寫入佇列。
這種方式也有顯而易見的壞處,所有要執行的任務程式碼都需要提前在worker端註冊好,client端和worker端的耦合變強了。
### 4.4 Prefetch
目前 Kombu QoS 只是支援 prefetch_count。
設定 prefetch_count 的目的是:
- **Prefetch**指的是一個Celery Worker節點,能夠提前獲取一些還**還未被其他節點執行**的任務,這樣可以提高Worker節點的執行效率。
- 同時也可以通過設定Qos的prefetch count來控制consumer的流量,防止消費者從佇列中一下拉取所有訊息,從而導致擊穿服務,導致服務崩潰或異常。
Kombu qos prefetch_count 是一個整數值N,表示的意思就是一個消費者最多隻能一次拉取N條訊息,一旦N條訊息沒有處理完,就不會從佇列中獲取新的訊息,直到有訊息被ack。
Kombu 中,會記錄 prefetch_count的值,同時記錄的還有該channel dirty (acked/rejected) 的訊息個數。
### 4.5 Celery函式
Celery 還提供了一些工作流功能,其中某些功能可以讓我們提高效能。比如 Chunks 功能。
任務塊函式能夠讓你將需要處理的大量物件分為分成若干個任務塊,如果你有一百萬個物件,那麼你可以建立 10 個任務塊,每個任務塊處理十萬個物件。有些人可能會擔心,分塊處理會導致並行效能下降,實際上,由於避免了訊息傳遞的開銷,因此反而會大大的提高效能。
```python
add_chunks_sig = add.chunks(zip(range(100), range(100)), 10)
result = add_chunks_sig.delay()
result.get()
```
## 0x05 分散式的思考
我們從負載均衡,容災恢復,worke之間互動這三個角度來看看 Celery 如何實現分散式。
### 5.1 負載均衡
Celery 的負載均衡其實可以分為三個層次,而且是與 Kombu 高度耦合(本文 broker 以 Redis 為例)。
- 在 worker 決定 與 哪幾個 queue 互動,有一個負載均衡;
- 在 worker 決定與 broker 互動,使用 brpop 獲取訊息時候有一個負載均衡;
- 在 worker 獲得 broker 訊息之後,內部 具體 呼叫 task 時候,worker 內部進行多程序分配時候,有一個負載均衡。
另外,Celery 還有一個 AutoScaler 元件,其作用 實際就是線上調節程序池大小。這也和緩解負載相關。
其主要邏輯大致如下圖所示(後續文章中會有詳細講解):
```python
+
Kombu | |Redis
|
BRPOP(keys) |
+------------------------------------------+ |
| Worker 1 | ------------------+ |
| | | |
+------------------------------------------+ | | queue 1 key
| |
| |
+------------------------------------------+ BRPOP(keys) | |
| Worker 2 | +---------------------------> queue 2 key
| | | |
+------------------------------------------+ | |
| |
+------------------------------------------+ | | queue 3 key
| Worker 3 | | |
| | | |
| +-----------+ | | |
| | queue 1 | | BRPOP(keys) | |
| | queue 2 | keys | | |
| | ...... | +--------+---------------------------------+ |
| | queue n | ^ | |
| +-----------+ | | |
| | | |
| + | |
| | |
| + round_robin_cycle | |
| | | |
+------------------------------------------+ |
| |
| fair_strategy |
| |
+-------+----------+----------------+ |
| | | |
v v v |
+-----+--------+ +------+-------+ +-----+--------+ |
| subprocess 1 | | subprocess 2 | | subprocess 3 | +
+--------------+ +--------------+ +--------------+
```
### 5.2 failover 容災恢復
#### 5.2.1 錯誤種類&失敗維度
Celery 之中,錯誤主要有3種:
- 使用者程式碼錯誤:錯誤可以直接返回應用,因為Celery無法知道如何處理;
- Broker錯誤:Celery可以根據負載平衡策略嘗試下一個節點;
- 網路超時錯誤:Celery可以重試該請求;
從系統角度出發,幾個最可能的失敗維度如下:
- Broker失敗;
- Worker ---> Broker 這個鏈路會失敗;
- Worker 節點會失敗;
- Worker 中的多程序中,某一個程序本身失效;
- Worker 的某一個程序中,內部處理任務失敗;
從實際處理看,broker可以使用 RabbitMQ,可以做 叢集和故障轉移;但這是涉及到整體系統設計的維度,所以本系列不做分析。
#### 5.2.2 處理方法
依據錯誤級別,錯誤處理 分別有 重試 與 fallback選擇 兩種。
我們以 Worker ---> Broker 維度為例來進行分析。此維度上主要關心的是:
1. Broker 某一個節點失效;
2. worker 與 Broker 之間網路失效;
在這個維度上,無論是 Celery 還是 Kombu 都做了努力,但是從根本來說,還是 Kombu 的努力。
##### 5.2.2.1 重試
在 Celery 中,對於重試,有 broker_connection_max_retries 配置,就是最大重試次數。
當出現網路故障時候,Celery 會根據 broker_connection_max_retries 配置來進行重試。
在 Komub 中,同樣做了 各種 重試 處理,比如 在 Connection.py 中有如下重試引數:
- max_retries:最大重試次數;
- errback (Callable):失敗回撥策略;
- callback (Callable):每次重試間隔的回撥函式;
##### 5.2.2.2 自動重試
自動重試是 kombu 的另外一種重試途徑,比如在 kombu\connection.py 就有 autoretry,其基本套路是:
- 在呼叫fun時候,可以使用 autoretry 這個mapper 做包裝。並且可以傳入上次呼叫成功的 channel。
- 如果呼叫fun過程中失敗,kombu 會自動進行try。
##### 5.2.2.3 fallback
如果重試不解決問題,則會使用 fallback。比如 broker_failover_strategy 是 Celery 針對 broker Connection 來設定的策略。會自動對映到 `kombu.connection.failover_strategies`。
Kombu 在配置 Connection的時候,可以設定多個 broker url,在連線 broker 的時候,kombu 自動會選取最健康的 broker 節點進行連線。
### 5.3 Worker之間的互動
前面提到,在處理具體控制管理工作時候,在執行狀態下,worker 程序之間通過 gossip 協議進行狀態的共享。
但是這個狀態共享對於任務的分配和worker 的排程沒有必然的聯絡,只是用來監控和響應控制檯訊息。因為假如有若干 worker,面對一個控制檯訊息,應該只有一個 worker 來響應其訊息,所以就利用 gossip 協議選舉出一個 leader,這個 leader 進行響應。
Gossip 協議跟其他協議一樣,也有一些不可避免的缺陷,主要是兩個:
**1)訊息的延遲**
由於 Gossip 協議中,節點只會隨機向少數幾個節點發送訊息,訊息最終是通過多個輪次的散播而到達全網的,因此使用 Gossip 協議會造成不可避免的訊息延遲。不適合用在對實時性要求較高的場景下。
**2)訊息冗餘**
Gossip 協議規定,節點會定期隨機選擇周圍節點發送訊息,而收到訊息的節點也會重複該步驟,因此就不可避免的存在訊息重複傳送給同一節點的情況,造成了訊息的冗餘,同時也增加了收到訊息的節點的處理壓力。而且,由於是定期傳送,因此,即使收到了訊息的節點還會反覆收到重複訊息,加重了訊息的冗餘。
為什麼用 gossip?可能因為是用 gossip 來處理管理功能,就是在 workers 之中選出一個 leader 來響應控制檯的訊息。這樣就不需要對訊息即時性有要求。
## 0x06 總結
通過以上的分析,大家應該對 Celery 的架構有了初步的瞭解。從下文開始,我們逐一分析 Celery 的幾個方面,敬請期待。
## 0xFF 參考
[Nginx資料之Master與Worker基礎概念](https://blog.csdn.net/sky6even/article/details/81409800)
[1: Worker 啟動流程概述](https://liqiang.io/post/celery-source-analysis-worker-start-flow?lang=ZH_CN)
[2: Worker 的執行引擎](https://liqiang.io/post/celery-source-analysis-worker-execute-engine?lang=ZH_CN)
[3: Task 物件的實現](https://liqiang.io/post/celery-source-analysis-task-object-implement?lang=ZH_CN)
[4: 定時任務的實現](https://liqiang.io/post/celery-source-analysis-scheduler-task-implements?lang=ZH_CN)
[5: 遠端控制管理](https://liqiang.io/post/celery-source-analysis-remote-manager-control?lang=ZH_CN)
[6: Events 的實現](https://liqiang.io/post/celery-source-analysis-event-implemention?lang=ZH_CN)
[7: Worker 之間的互動](https://liqiang.io/post/celery-source-analysis-interact-between-workers?lang=ZH_CN)
[8: State 和 Result](https://liqiang.io/post/state-and-result-implement-in-celery?lang=ZH_CN)
[Spark分散式計算引擎的應用](https://zhuanlan.zhihu.com/p/63