1. 程式人生 > 程式設計 >Python celery原理及執行流程解析

Python celery原理及執行流程解析

celery簡介

celery是一個基於分散式訊息傳輸的非同步任務佇列,它專注於實時處理,同時也支援任務排程。它的執行單元為任務(task),利用多執行緒,如Eventlet,gevent等,它們能被併發地執行在單個或多個職程伺服器(worker servers)上。任務能非同步執行(後臺執行)或同步執行(等待任務完成)。

在生產系統中,celery能夠一天處理上百萬的任務。它的完整架構圖如下:

Python celery原理及執行流程解析

元件介紹:

  • Producer:呼叫了Celery提供的API、函式或者裝飾器而產生任務並交給任務佇列處理的都是任務生產者。
  • Celery Beat:任務排程器,Beat程序會讀取配置檔案的內容,週期性地將配置中到期需要執行的任務傳送給任務佇列。
  • Broker:訊息代理,又稱訊息中介軟體,接受任務生產者傳送過來的任務訊息,存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫)。Celery目前支援RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為訊息代理,但適用於生產環境的只有RabbitMQ和Redis,官方推薦 RabbitMQ。
  • Celery Worker:執行任務的消費者,通常會在多臺伺服器執行多個消費者來提高執行效率。
  • Result Backend:任務處理完後儲存狀態資訊和結果,以供查詢。Celery預設已支援Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

工作原理

它的基本工作就是管理分配任務到不同的伺服器,並且取得結果。至於說伺服器之間是如何進行通訊的?這個Celery本身不能解決。所以,RabbitMQ作為一個訊息佇列管理工具被引入到和Celery整合,負責處理伺服器之間的通訊任務。和rabbitmq的關係只是在於,celery沒有訊息儲存功能,他需要介質,比如rabbitmq、redis、mysql、mongodb 都是可以的。推薦使用rabbitmq,他的速度和可用性都很高。

Celery安裝及使用

1、安裝celery

pip install celery

2、檢視完整可用命令選項

celery worker --help

3、建立一個工程專案project,然後再專案內建立一個celery_tasks非同步任務列表。如圖:

Python celery原理及執行流程解析

4、首先是celery_tasks非同步任務主程式main.py,程式碼如下:

from celery import Celery
# 生成celery應用
celery_app = Celery("caicai")
# 載入配置檔案
celery_app.config_from_object('celery_tasks.config')
# 註冊任務
celery_app.autodiscover_tasks(['celery_tasks.email']) # 注意:傳遞的引數是任務列表

分析一下這個程式:

  • "from celery import Celery"是匯入celery中的Celery類。celery_app
  • celery_app是Celery類的例項。
  • 把Celery配置存放進project/config.py檔案,使用celery_app.config_from_object載入配置。
  • 將任務註冊到應用中

5、接著是配置檔案config.py,程式碼如下:

BROKER_URL = 'redis://localhost:6379/1' # 使用Redis作為訊息代理

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務結果存在了Redis

# CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案

CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般效能要求不高,所以使用了可讀性更好的JSON

CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # celery任務結果有效期

CELERY_ACCEPT_CONTENT = ['json','msgpack'] # 指定接受的內容型別

CELERY_TIMEZONE = 'Asia/Shanghai'       # celery使用的時區
CELERY_ENABLE_UTC = True            # 啟動時區設定
CELERYD_LOG_FILE = "/var/log/celery/celery.log"   # celery日誌儲存位置

6、建立email目錄,目錄下建立tesks.py檔案用來編寫傳送郵件的程式碼,程式碼如下:

import time
from celery_tasks.main import celery_app
@celery_app.task(name='seed_email')   # 新增celery_app.task這個裝飾器,指定該任務的任務名name='seed_email'
def seed():
  time.sleep(1)
  return "我將傳送郵件"

7、在專案app.py中,採用delay()用來呼叫任務。

from celery_tasks.email.tasks import seed
seed.delay()
seed.delay()
seed.delay()
seed.delay()
seed.delay()

8、專案執行

  首先,我們需要啟動redis。接著,切換至proj專案所在目錄,並執行命令:

celery -A celery_tasks.main worker -l info

  介面如下:

Python celery原理及執行流程解析

然後,我們執行app.py,app.py呼叫新增非同步任務,輸出的結果如下:

Python celery原理及執行流程解析

Python celery原理及執行流程解析

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。