1. 程式人生 > >[原始碼解析] 並行分散式框架 Celery 之架構 (2)

[原始碼解析] 並行分散式框架 Celery 之架構 (2)

# [原始碼解析] 並行分散式框架 Celery 之架構 (2) [toc] ## 0x00 摘要 Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。 本系列將通過原始碼分析,和大家一起深入學習 Celery。本文是系列第二篇,繼續探究 Celery 架構。 ## 0x01 上文回顧 前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。 [[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html) [[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html) [[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html) [[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html) [[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html) [[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html) 上文 [[原始碼解析] 並行分散式框架 Celery 之架構 (1)](https://www.cnblogs.com/rossiXYZ/p/14562308.html) 中,我們大致介紹了 Celery 的概念,用途和架構,現在回憶 Celery 的架構圖如下: ```python +-----------+ +--------------+ | Producer | | Celery Beat | +-------+---+ +----+---------+ | | | | v v +-------------------------+ | Broker | +------------+------------+ | | | +-------------------------------+ | | | v v v +----+-----+ +----+------+ +-----+----+ | Exchange | | Exchange | | Exchange | +----+-----+ +----+------+ +----+-----+ | | | v v v +-----+ +-------+ +-------+ |queue| | queue | | queue | +--+--+ +---+---+ +---+---+ | | | | | | v v v +---------+ +--------+ +----------+ | worker | | Worker | | Worker | +-----+---+ +---+----+ +----+-----+ | | | | | | +-----------------------------+ | | v +---+-----+ | backend | +---------+ ``` 下面我們從幾個方面繼續分析。 ## 0x02 worker的思考 當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。 ### 2.1 worker的模式 首先,我們思考下worker 的工作模式,即,這些併發的 worker 彼此之間是什麼關係?是否需要一個master 來統一調控?為了更好的對比,我們先看看nginx的實現。 #### 2.1.1 Nginx模式 nginx 後臺程序包含一個master程序和多個worker程序。 - master程序主要用來管理worker程序,包含:接收來自外界的訊號,向各worker程序傳送訊號,監控worker程序的執行狀態,當worker程序退出後(異常情況下),會自動重新啟動新的worker程序。 - worker程序則處理基本的網路事件。多個worker程序之間是對等的,他們同等競爭來自客戶端的請求,各程序互相之間是獨立的。一個請求 只可能在一個worker程序中處理,一個worker程序,不可能處理其它程序的請求。 worker程序之間是平等的,每個程序處理請求的機會也是一樣的。一個連線請求過來,每個程序都有可能處理這個連線,怎麼做到的呢? - 首先,每個worker程序都是從master程序fork過來,在master程序裡面,先建立好需要listen的socket(listenfd)之後,然後再fork出多個worker程序。 - 其次,所有worker程序的listenfd會在新連線到來時變得可讀,為保證只有一個程序處理該連線,所有worker程序在註冊listenfd讀事件前搶accept_mutex,搶到互斥鎖的那個程序註冊listenfd讀事件,在讀事件裡呼叫accept接受該連線。 - 最後,當一個worker程序在accept這個連線之後,就開始讀取請求,解析請求,處理請求,產生資料後,再返回給客戶端,最後才斷開連線,這樣一個完整的請求就是這樣的了。 我們可以看到,一個請求完全由worker程序來處理,而且只在一個worker程序中處理。 #### 2.1.2 Celery 模式 ##### 2.1.2.1 模式 與 Nginx不同,在 Celery 之中,沒有 master 程序。所有的都是worker 程序。大家都在 redis 之上等待新任務。 但是,每一個worker內部,父程序和子程序內部,卻又是 master - slave 模式,也就是我們常說的主從。 - master(就是父程序)負責任務的獲取,分發,slaves 的管理(建立,增加,關閉,重啟,丟棄等等),其他輔助模組的維護等等。 - slave(就是子程序)負責消費從排程器傳遞過來的任務。 worker內部 具體流程如下: - 排程器首先**預生成**(prefork)一些工作程序,做為一個**程序池**(mutiprocessing-pool),之後通過**事件驅動**(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。 - 如果master監聽到就執行對應的回撥,源源不斷的從 **中間人**(broker)那裡提取任務,並通過 **管道**(pipe)作為程序間通訊的方式,運用一系列的路由策略(round-robin、weight 等等)交給slave。 - slave工作程序 消費(ack)任務,再通過管道向排程器進行狀態同步(sync),程序間通訊等等行為。 - 這個 workloop 其實很明顯,就是監聽讀管道的資料(主程序從這個管道的另一端寫),然後執行對應的回撥,期間會呼叫 put 方法,往寫管道同步狀態(主程序可以從管道的另一端讀這個資料)。 具體如下圖: ```python +-------------+ | | | Redis | | | +---+------+--+ ^ ^ | | | | +-----------------------+ +--------------+ | | | | | | +------------------+--------------------+ +-----------+--------+ | Worker 1 | | Worker n | | | | | | | | | | Parent process | | Parent process | | + | | + | | | | | | | | | | | | | | +--------+------------+ | | | | | | | | | | | | | | | | | | | v v | | v | | subprocess 1 ... subprocess n | ... | subprocess | | | | | +---------------------------------------+ +--------------------+ ``` ##### 2.1.2.2 互動 在 Celery 中,採用的是分散式的管理方式,每個節點之間都是通過廣播/單播進行通訊,從而達到協同效果。 在處理具體控制管理工作時候,worker 程序之間有交流,具體分為兩種: - 啟動時候使用 Mingle 模組來互相交換資訊。 - 執行狀態下,通過 gossip 協議進行狀態的共享。但是這個狀態共享對於任務的分配和worker 的排程沒有必然的聯絡,只是用來監控和響應控制檯訊息。因為假如有若干 worker,面對一個控制檯訊息,應該只有一個 worker 來響應其訊息,所以就利用 gossip 協議選舉出一個 leader,這個 leader 進行響應。 在處理具體業務工作時候,worker 之間沒有交流。 當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel,一個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。 以 redis 為例,底層 Kombu 事實上是使用 redis 的 BRPOP 功能來完成對具體 queue 中訊息的讀取。 如果多個 worker 同時去使用 brpop 獲取 broker 訊息,那麼具體哪一個能夠讀取到訊息,其實這就是有一個 競爭機制,因為redis 的單程序處理,所以只能有一個 worker 才能讀到。 ### 2.2 worker 組成 在 worker 文件中提到:worker主要由四部分組成的:task_pool, consumer, scheduler, mediator。 這四部分依賴下面兩組資料結構工作。 - 就緒佇列:那些 立刻就需要執行的task, 這些task到達worker的時候會被放到這個就緒佇列中等待consumer執行。 - ETA:是那些有ETA引數,或是rate_limit引數的 task。這些 task 被放入 timer 佇列中,timer 負責在條件合適的情況下,把這些 task 放入執行pool。 但是實際上,mediator 在程式碼中沒有發現。也許是 mediator 成了預設功能而非元件。 #### 2.2.1 task_pool task_pool主要是用來存放的是一些worker。當啟動了一個worker,並且提供併發引數的時候,會將一些worker放在這裡面。 celery預設的併發方式是prefork,也就是多程序的方式,這裡只是celery對`multiprocessing.Pool`進行了輕量的改造,然後給了一個新的名字叫做prefork。 這個pool與多程序的程序池的區別就是這個task_pool只是存放一些執行的worker。 #### 2.2.2 consumer consumer也就是消費者, 主要是從broker那裡接受一些message。然後將message轉化為`celery.worker.request.Request` 的一個例項。並且在適當的時候,會把這個請求包裝進Task中。 Task就是用裝飾器 `app_celery.task()` 裝飾的函式所生成的類,所以可以在自定義的任務函式中使用這個請求引數,獲取一些關鍵的資訊。 #### 2.2.3 Scheduler 對於 Scheduler,可以從 Beat 和 Timer 兩個方面講述。 ##### 2.2.3.1 Beat Celery Beat:任務排程器,Beat程序會讀取配置檔案的內容,週期性地將配置中到期需要執行的任務傳送給任務佇列。 其中樞部分就是 **Scheduler**,**Service** 是驅動部分,最後的承載實體就是 **SchedulerEntry**。 其內部主要資料結構是一個最小堆,它的作用就是承載了所有我們設定得定時任務,而最小堆的特性就是堆頂的元素是最小的,排序的依據是時間差值。celery 會先計算每個定時任務下一次執行的時間戳 - 當前時間戳,然後根據這個時間差值進行排序,毫無疑問,差值最小的就是下一次需要執行的任務。 在 Service 的 start 函式中,會呼叫 scheduler.tick(),從而在內部最小堆中獲取下次一需要執行的任務。將 `SchedulerEntry` 轉換為 `Task`,傳送到 redis 的佇列中。 具體定義如下: ```python class Scheduler: """Scheduler for periodic tasks. """ Entry = ScheduleEntry #: The schedule dict/shelve. schedule = None #: Maximum time to sleep between re-checking the schedule. max_interval = DEFAULT_MAX_INTERVAL #: How often to sync the schedule (3 minutes by default) sync_every = 3 * 60 #: How many tasks can be called before a sync is forced. sync_every_tasks = None _last_sync = None _tasks_since_sync = 0 ``` **持久化** 在 Celery 中,定時任務的執行並不會因為我們重啟了 Celery 而失效,反而在重啟 Celery 之後,Celery 會根據上一次關閉之前的執行狀態,重新計算新的執行週期,而這裡計算的前提就是能夠獲取舊的執行資訊,而在 Scheduler 中,這些資訊都是預設儲存在檔案中的。 Celery 預設的儲存是通過 Python 預設的 shelve 庫實現的,shelve 是一個類似於字典物件的資料庫,我們可以通過呼叫 `sync` 命令在磁碟和記憶體中同步資料。 ##### 2.2.3.2 Timer 文件中對於 Timer 的描述如下: > The timer schedules internal functions, like cleanup and internal monitoring, > but also it schedules ETA tasks and rate limited tasks. > If the scheduled tasks ETA has passed it is moved to the execution pool. 可以看到,其功能就是排程內部的函式,比如清理和監控,也排程ETA tasks and rate limited tasks。 對於清理,有比如 backend.process_cleanup 和 loader.on_process_cleanup。 ### 2.3 初始化過程 worker初始化過程中,各個模組的執行順序是由一個BluePrint類定義,並且根據各個模組之間的依賴進行排序執行。 *Worker* 的 *start* 方法中,其實就是執行了一個 *self.blueprint* 的 *start* 方法,這裡面的 blueprint,是 celery 自己實現的一個 **有向無環圖**(DAG)的資料結構,其功能簡單描述下就是:根據命令列傳入的不同引數,初始化不同的元件(step),並執行這些元件的初始化方法。其實就是一個對流程控制的面向物件的封裝。 每個 Step 的具體的功能如下: - Timer:用於執行定時任務的 Timer; - Hub:Eventloop 的封裝物件; - Pool:構造各種執行池(執行緒/程序/協程); - Autoscaler:用於自動增長或者 pool 中工作單元; - StateDB:持久化 worker 重啟區間的資料(只是重啟); - Autoreloader:用於自動載入修改過的程式碼; - Beat:建立 Beat 程序,不過是以子程序的形式執行(不同於命令列中以 beat 引數執行); ## 0x03 Consumer的思考 Celery 使用 Consumer 來從 broker 獲取訊息。 ### 3.1 元件 Consumer 的元件如下: - 【1】Connection:管理和 broker 的 Connection 連線 - 【3】Events:用於傳送監控事件 - 【2】Agent:`cell` actor - 【2】Mingle:不同 worker 之間同步狀態用的 - 【1】Tasks:啟動訊息 Consumer - 【3】Gossip:消費來自其他 worker 的事件 - 【1】Heart:傳送心跳事件(consumer 的心跳) - 【3】Control:遠端命令管理服務 在參考文章 [1: Worker 啟動流程概述](https://liqiang.io/post/celery-source-analysis-worker-start-flow?lang=ZH_CN) 中提到: > 這裡我對所有的 Bootstep 都做了標號處理,標號的大小說明了這些服務對於我們程式碼閱讀的重要程式,1 最重要,3 最不緊要。對於 Consumer 來說, > > 1 是基本功能,這些功能組成了一個簡單的非強壯的訊息佇列框架; > > 2 一般重要,可以實現一個高階一點的功能; > > 3 屬於附加功能,同時也屬於一點分散式的功能。 ### 3.2 作用 因此,我們可以看到,celery Consumer 元件的概念遠遠要大於Kombu的Consumer,不只是從broker取得訊息,也包括訊息的消費,分發,監控,心跳等一系列功能。 可以說,除了訊息迴圈引擎 被 hub 承擔,多程序被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔
。 ## 0x04 高效能的思考 celery 的高效能主要靠兩個方面來保證,**一個是多程序**,**一個是事件驅動**。此外在一些具體功能實現方面也確保了高效能的實現。 ### 4.1 多程序 多程序可以良好的發揮每個核的計算能力。可以在一定程度上提升程式的併發能力,緩解 IO 的壓力。 Celery 的方案叫做 prefork,也就是預生成。預生成指的是,主程序在執行具體的業務邏輯之前,先提前 fork 出來一堆子程序,並把他們存起來集中管理,形成一個程序池。平常的時候這些子程序都是 休眠(asleep) 狀態,只有當主程序派發任務的時候,會喚醒(awake)其中的一個子程序,並通過程序間通訊的手段,向子程序傳輸相應的任務資料。 如前所述,每一個worker內部,父程序和子程序內部,是 master - slave 模式。 - master(就是父程序)負責任務的獲取,分發,slaves 的管理(建立,增加,關閉,重啟,丟棄等等),其他輔助模組的維護等等。 - 排程器首先**預生成**(prefork)一些工作程序(slave),做為一個**程序池**(mutiprocessing-pool),之後通過**事件驅動**(select/poll/epoll)的方式,監聽核心的事件(讀、寫、異常等等)。 - slave(就是子程序)負責消費從排程器傳遞過來的任務。再通過管道向排程器進行狀態同步(sync),程序間通訊等等行為。 ### 4.2 事件驅動 Kombu內部使用了事件驅動
。 Master 排程器是一個事件驅動模型,什麼是事件驅動,其實就是它消滅了阻塞。 正常的單執行緒模型,一次只能拿一條訊息,每一次都要走一條來和回的鏈路,並且需要一個 while True 的迴圈不斷的去檢測,這樣無疑是非常低效且開銷大的。 而事件驅動則不這樣,他可以同時傳送多個檢測的訊號,然後就直接掛起,等待核心進行提示,有提示再去執行對應的回撥。這樣既優雅的化解了單執行緒每次都要檢測的 while True,又通過多次請求併發降低了重複鏈路。 以 epoll 為例: - epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些。 - epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表 就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了 這些檔案描述符在系統呼叫時複製的開銷。 - 另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描 述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥 機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。 ### 4.3 Task的實現 Task 承載的功能就是在 Celery 應用中,啟動對應的訊息消費
者。 關於 Task 的實現,這就涉及問題:究竟是分發程式碼還是分發資料? 因為 Celery 是一個通用性功能,不是特定面對大資料,所以分發資料是不可避免的。 剩下問題就是是否需要分發程式碼? Task 任務最基本的形式就是函式,任務釋出最直接的想法就是client將要執行的相關函式程式碼打包,釋出到broker。分散式計算框架spark就是使用這種方式。 #### 4.3.1 分發程式碼 業界分發程式碼的代表是 Spark。Spark的思想比較簡單:挪計算不挪資料。那怎麼去描述這個計算呢?Spark 通過RDD封裝一個針對資料對應關係記錄,在這個封裝之上來記錄計算。這就涉及到兩個最重要的問題: - 如何拆分計算邏輯; - 如何分發計算邏輯; 於是 Spark 把所有的計算邏輯劃分為這兩種型別: - 能夠分發到各個節點上並行執行的; - 需要經過一定量的結果合併之後才能繼續執行的; 然後把**一個巨大的問題拆分成相對獨立的子問題分發到各個機器上求解。** 在實際提交時候,Spark把計算程式碼提交到每個工作節點上然後進行計算。 #### 4.3.2 Celery 模式 2.0之前的celery也支援這種任務釋出的方式。這種方式顯而易見的一個壞處是傳遞給broker的資料量可能會比較大。解決的辦法也很容易想到,就是把要釋出的任務相關的程式碼,提前告訴worker。 這就是 全域性集合 和 註解註冊的作用。 ```python @app.task(name='hello_task') def hello(): print('hello') ``` 其中的app是worker中的application,通過裝飾器的方式,對任務函式註冊。 app會維護一個字典,key是任務的名字,也就是這裡的`hello_task`,value是這個函式的記憶體地址。任務名必須唯一,但是任務名這個引數不是必須的,如果沒有給這個引數,celery會自動根據包的路徑和函式名生成一個任務名。 通過上面這種方式,client釋出任務只需要提供任務名以及相關引數,不必提供任務相關程式碼: ```python # client端 app.send_task('hello_task') ``` 這裡需要注意:client釋出任務後,任務會以一個訊息的形式寫入broker佇列,帶有任務名稱等相關引數,等待worker獲取。這裡任務的釋出,是完全獨立於worker端的,即使worker沒有啟動,訊息也會被寫入佇列。 這種方式也有顯而易見的壞處,所有要執行的任務程式碼都需要提前在worker端註冊好,client端和worker端的耦合變強了。 ### 4.4 Prefetch 目前 Kombu QoS 只是支援 prefetch_count。 設定 prefetch_count 的目的是: - **Prefetch**指的是一個Celery Worker節點,能夠提前獲取一些還**還未被其他節點執行**的任務,這樣可以提高Worker節點的執行效率。 - 同時也可以通過設定Qos的prefetch count來控制consumer的流量,防止消費者從佇列中一下拉取所有訊息,從而導致擊穿服務,導致服務崩潰或異常。 Kombu qos prefetch_count 是一個整數值N,表示的意思就是一個消費者最多隻能一次拉取N條訊息,一旦N條訊息沒有處理完,就不會從佇列中獲取新的訊息,直到有訊息被ack。 Kombu 中,會記錄 prefetch_count的值,同時記錄的還有該channel dirty (acked/rejected) 的訊息個數。 ### 4.5 Celery函式 Celery 還提供了一些工作流功能,其中某些功能可以讓我們提高效能。比如 Chunks 功能。 任務塊函式能夠讓你將需要處理的大量物件分為分成若干個任務塊,如果你有一百萬個物件,那麼你可以建立 10 個任務塊,每個任務塊處理十萬個物件。有些人可能會擔心,分塊處理會導致並行效能下降,實際上,由於避免了訊息傳遞的開銷,因此反而會大大的提高效能。 ```python add_chunks_sig = add.chunks(zip(range(100), range(100)), 10) result = add_chunks_sig.delay() result.get() ``` ## 0x05 分散式的思考 我們從負載均衡,容災恢復,worke之間互動這三個角度來看看 Celery 如何實現分散式。 ### 5.1 負載均衡 Celery 的負載均衡其實可以分為三個層次,而且是與 Kombu 高度耦合(本文 broker 以 Redis 為例)。 - 在 worker 決定 與 哪幾個 queue 互動,有一個負載均衡; - 在 worker 決定與 broker 互動,使用 brpop 獲取訊息時候有一個負載均衡; - 在 worker 獲得 broker 訊息之後,內部 具體 呼叫 task 時候,worker 內部進行多程序分配時候,有一個負載均衡。 另外,Celery 還有一個 AutoScaler 元件,其作用 實際就是線上調節程序池大小。這也和緩解負載相關。 其主要邏輯大致如下圖所示(後續文章中會有詳細講解): ```python + Kombu | |Redis | BRPOP(keys) | +------------------------------------------+ | | Worker 1 | ------------------+ | | | | | +------------------------------------------+ | | queue 1 key | | | | +------------------------------------------+ BRPOP(keys) | | | Worker 2 | +---------------------------> queue 2 key | | | | +------------------------------------------+ | | | | +------------------------------------------+ | | queue 3 key | Worker 3 | | | | | | | | +-----------+ | | | | | queue 1 | | BRPOP(keys) | | | | queue 2 | keys | | | | | ...... | +--------+---------------------------------+ | | | queue n | ^ | | | +-----------+ | | | | | | | | + | | | | | | + round_robin_cycle | | | | | | +------------------------------------------+ | | | | fair_strategy | | | +-------+----------+----------------+ | | | | | v v v | +-----+--------+ +------+-------+ +-----+--------+ | | subprocess 1 | | subprocess 2 | | subprocess 3 | + +--------------+ +--------------+ +--------------+ ``` ### 5.2 failover 容災恢復 #### 5.2.1 錯誤種類&失敗維度 Celery 之中,錯誤主要有3種: - 使用者程式碼錯誤:錯誤可以直接返回應用,因為Celery無法知道如何處理; - Broker錯誤:Celery可以根據負載平衡策略嘗試下一個節點; - 網路超時錯誤:Celery可以重試該請求; 從系統角度出發,幾個最可能的失敗維度如下: - Broker失敗; - Worker ---> Broker 這個鏈路會失敗; - Worker 節點會失敗; - Worker 中的多程序中,某一個程序本身失效; - Worker 的某一個程序中,內部處理任務失敗; 從實際處理看,broker可以使用 RabbitMQ,可以做 叢集和故障轉移;但這是涉及到整體系統設計的維度,所以本系列不做分析。 #### 5.2.2 處理方法 依據錯誤級別,錯誤處理 分別有 重試 與 fallback選擇 兩種。 我們以 Worker ---> Broker 維度為例來進行分析。此維度上主要關心的是: 1. Broker 某一個節點失效; 2. worker 與 Broker 之間網路失效; 在這個維度上,無論是 Celery 還是 Kombu 都做了努力,但是從根本來說,還是 Kombu 的努力。 ##### 5.2.2.1 重試 在 Celery 中,對於重試,有 broker_connection_max_retries 配置,就是最大重試次數。 當出現網路故障時候,Celery 會根據 broker_connection_max_retries 配置來進行重試。 在 Komub 中,同樣做了 各種 重試 處理,比如 在 Connection.py 中有如下重試引數: - max_retries:最大重試次數; - errback (Callable):失敗回撥策略; - callback (Callable):每次重試間隔的回撥函式; ##### 5.2.2.2 自動重試 自動重試是 kombu 的另外一種重試途徑,比如在 kombu\connection.py 就有 autoretry,其基本套路是: - 在呼叫fun時候,可以使用 autoretry 這個mapper 做包裝。並且可以傳入上次呼叫成功的 channel。 - 如果呼叫fun過程中失敗,kombu 會自動進行try。 ##### 5.2.2.3 fallback 如果重試不解決問題,則會使用 fallback。比如 broker_failover_strategy 是 Celery 針對 broker Connection 來設定的策略。會自動對映到 `kombu.connection.failover_strategies`。 Kombu 在配置 Connection的時候,可以設定多個 broker url,在連線 broker 的時候,kombu 自動會選取最健康的 broker 節點進行連線。 ### 5.3 Worker之間的互動 前面提到,在處理具體控制管理工作時候,在執行狀態下,worker 程序之間通過 gossip 協議進行狀態的共享。 但是這個狀態共享對於任務的分配和worker 的排程沒有必然的聯絡,只是用來監控和響應控制檯訊息。因為假如有若干 worker,面對一個控制檯訊息,應該只有一個 worker 來響應其訊息,所以就利用 gossip 協議選舉出一個 leader,這個 leader 進行響應。 Gossip 協議跟其他協議一樣,也有一些不可避免的缺陷,主要是兩個: **1)訊息的延遲** 由於 Gossip 協議中,節點只會隨機向少數幾個節點發送訊息,訊息最終是通過多個輪次的散播而到達全網的,因此使用 Gossip 協議會造成不可避免的訊息延遲。不適合用在對實時性要求較高的場景下。 **2)訊息冗餘** Gossip 協議規定,節點會定期隨機選擇周圍節點發送訊息,而收到訊息的節點也會重複該步驟,因此就不可避免的存在訊息重複傳送給同一節點的情況,造成了訊息的冗餘,同時也增加了收到訊息的節點的處理壓力。而且,由於是定期傳送,因此,即使收到了訊息的節點還會反覆收到重複訊息,加重了訊息的冗餘。 為什麼用 gossip?可能因為是用 gossip 來處理管理功能,就是在 workers 之中選出一個 leader 來響應控制檯的訊息。這樣就不需要對訊息即時性有要求。 ## 0x06 總結 通過以上的分析,大家應該對 Celery 的架構有了初步的瞭解。從下文開始,我們逐一分析 Celery 的幾個方面,敬請期待。 ## 0xFF 參考 [Nginx資料之Master與Worker基礎概念](https://blog.csdn.net/sky6even/article/details/81409800) [1: Worker 啟動流程概述](https://liqiang.io/post/celery-source-analysis-worker-start-flow?lang=ZH_CN) [2: Worker 的執行引擎](https://liqiang.io/post/celery-source-analysis-worker-execute-engine?lang=ZH_CN) [3: Task 物件的實現](https://liqiang.io/post/celery-source-analysis-task-object-implement?lang=ZH_CN) [4: 定時任務的實現](https://liqiang.io/post/celery-source-analysis-scheduler-task-implements?lang=ZH_CN) [5: 遠端控制管理](https://liqiang.io/post/celery-source-analysis-remote-manager-control?lang=ZH_CN) [6: Events 的實現](https://liqiang.io/post/celery-source-analysis-event-implemention?lang=ZH_CN) [7: Worker 之間的互動](https://liqiang.io/post/celery-source-analysis-interact-between-workers?lang=ZH_CN) [8: State 和 Result](https://liqiang.io/post/state-and-result-implement-in-celery?lang=ZH_CN) [Spark分散式計算引擎的應用](https://zhuanlan.zhihu.com/p/63