1. 程式人生 > 其它 >Celery非同步任務框架

Celery非同步任務框架

一、Celery非同步任務框架

Celery是一個非同步任務框架,並且是一個簡單、靈活可靠的,處理大量訊息的分散式系統

Celery服務為其他專案服務提供非同步解決任務的需求,內建socket

Celery可執行的任務:執行非同步任務,執行延遲任務,執行定時任務

Celery 官方文件英文版:http://docs.celeryproject.org/en/latest/index.html

二、Celery架構

Celery是由三部分組成的,訊息中介軟體(message broker)、任務執行單元(worker)、任務結果儲存(task result store)組成的。

Broker(任務中介軟體)————>Worker(任務執行者)————>Backend(任務結果倉庫)

訊息中介軟體:Celery是不提供訊息服務的,但是可以使用第三方來提供訊息服務(提供任務),列如,Redis。

任務執行單元:Worker會自動(後臺非同步)執行訊息中介軟體(broker)中的任務任務。

任務結果儲存:將Worker執行的結果儲存在backend中,可以使用Redis來儲存

三、Celery任務結構

Celery有兩種任務結構,基本結構、包架構封裝,但是我們提倡使用包架構封裝,因為結構更加清晰,例如:

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包檔案
    │   ├── celery.py   # celery連線和配置相關檔案,且名字必須叫celery.py
    │   └── tasks.py    # 所有任務函式
    ├── add_task.py  	# 新增任務
    └── get_result.py   # 獲取結果

包架構封裝

celery.py

from celery import Celery

broker = 'redis://127.0.0.1:6379/1'     # broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'    # backend 結果儲存,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])

tasks.py

from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def multip(x, y):
    return x * y

非同步任務執行:

add_task.py

把 tasks.py 中的任務函式新增到 broker 中

windows 首先需要安裝:pip install celery 和 pip install eventlet

需要啟動 celery, 在包專案下輸入以下命令:

C:\project> celery -A celery_task worker -P eventlet -l info

celery -A 包名 worker -P eventlet -l info

from celery_task.tasks import add

# 提交非同步任務
ret = add.delay(5, 3)	# 把add函式任務新增到 broker中,worker在非同步實時取出執行

print(ret)	# 0cc72e56-4604-4c00-bb3d-5b456f4869a7	獲取執行結果需要此ID

延遲任務執行:

add_task.py

還是需要先啟動celery

from celery_task.tasks import multip
# 提交延遲任務
from datetime import datetime, timedelta

# 需要UTC時間
eta = datetime.utcnow() + timedelta(seconds=10)		# 當前UTC時間往後加10秒
ret = multip.apply_async(args=(9, 9), eta=eta)		# 10 秒之後執行

print(ret)	# 3c8cfa57-05ff-4a26-b8fa-1f7f2d8051f2	獲取執行結果需要此ID

定時任務執行:

執行定時任務需要從新配置celery.py

from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

broker = 'redis://127.0.0.1:6379/1'  # broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'  # backend 任務佇列,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.tasks'])

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

app.conf.beat_schedule = {
    # add 任務
    'add-task': {
        'task': 'celery_task.tasks.add',
        'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點執行一次
        'args': (300, 150),
    },
    # multip 任務
    'multip-task': {
        'task': 'celery_task.tasks.multip',
        'schedule': timedelta(seconds=3),           # 每三秒執行一次
        'args': (300, 150),
    }
}

啟動 worker 等待執行任務

celery -A celery_task beat -l info

celery -A 包名 beat -l info

啟動 beat 將任務新增 broker 中,讓worker執行

celery -A celery_task worker -P eventlet -l info

celery -A 包名 worker -P eventlet -l info

檢視任務執行結果:

get_result.py

from celery_task.celery import app
from celery.result import AsyncResult

id = '3fedc0d8-32c8-4b1a-af43-fedfac6107a2'

if __name__ == '__main__':
    asyncs = AsyncResult(id=id, app=app)

    if asyncs.successful():
        result = asyncs.get()
        print(result)	# 成功則取出backend中id對應的值
        
    elif asyncs.failed():
        print('任務失敗')
    elif asyncs.status == 'PENDING':
        print('任務等待中被執行')
    elif asyncs.status == 'RETRY':
        print('任務異常後正在重試,或id不存在')
    elif asyncs.status == 'STARTED':
        print('任務已經開始被執行')

基本結構

建立py檔案:celery_app_task.py

from celery import Celery
import time

# backend='redis://:[email protected]:6379/1'	# 有密碼123456

broker = 'redis://127.0.0.1:6379/1'  	# broker 任務佇列,任務放到這裡面
backend = 'redis://127.0.0.1:6379/2'  	# backend 任務佇列,執行結果放在這裡面

app = Celery(__name__, broker=broker, backend=backend)

@app.task
def add(x, y):
    return x + y

啟動 worker

celery -A celery_app_task worker -P eventlet -l info

新增任務:add_task.py

from celery_app_task import add

# 提交任務
ret = add.delay(5, 3)	# 往 broker 中新增一個任務
print(ret)
學習之旅