1. 程式人生 > 實用技巧 >celery分散式非同步框架

celery分散式非同步框架

celery分佈非同步框架

1.什麼是Celery

Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統

專注於實時處理的非同步任務佇列

同時也支援任務排程

Celery架構

Celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。

訊息中介軟體

Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis等等

任務執行單元

Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis等

版本支援情況

python 3.6版本支援celery 4.2.1

Celery version 4.0 runs on
        Python ❨2.7, 3.4, 3.5❩
        PyPy ❨5.4, 5.5❩
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 
2.6: Celery series 3.1 or earlier. Python 2.5: Celery series 3.0 or earlier. Python 2.4 was Celery series 2.2 or earlier. Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

2.使用場景

非同步任務:將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推送、音視訊處理等等

定時任務:定時執行某件事情,比如每天資料統計

回到頂部

3.Celery的安裝配置

pip install celery

訊息中介軟體:RabbitMQ/Redis

app=Celery('任務名',backend='xxx',broker='xxx')

回到頂部

4.Celery執行非同步任務

基本使用

建立專案celerytest

建立py檔案:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:[email protected]:6379/1'
broker='redis://:[email protected]:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y

建立py檔案:add_task.py,新增任務

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

建立py檔案:run.py,執行任務,或者使用命令執行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

eventlet此模組需要另外安裝

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

建立py檔案:result.py,檢視任務執行結果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

執行 add_task.py,新增任務,並獲取任務ID

執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info

執行 result.py,檢查任務狀態並獲取結果

多工結構

pro_cel
    ├── celery_task# celery相關資料夾
    │   ├── celery.py   # celery連線和配置相關檔案,必須叫這個名字
    │   └── tasks1.py    #  所有任務函式
    │    └── tasks2.py    #  所有任務函式
    ├── check_result.py # 檢查結果
    └── send_task.py    # 觸發任務

celery.py

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下兩個任務檔案,去相應的py檔案中找任務,對多個任務做分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 時區
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務結果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務結果:%s"%res

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結果刪除,執行完成,結果不會自動刪除
    # async.revoke(terminate=True)  # 無論現在是什麼時候,都要終止
    # async.revoke(terminate=False) # 如果任務還沒有開始執行呢,那麼就可以終止。
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 立即告知celery去執行test_celery任務,並傳入一個引數
result = test_celery.delay('第一個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)

新增任務(執行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)

回到頂部

5.Celery執行定時任務

設定時間讓celery執行一個任務

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())將當前時間轉成utc格式
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 預設用utc時間,需要轉成utc時間格式
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async並設定時間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

類似於contab的定時任務

多工結構中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執行tasks1下的test_celery函式
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞引數
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每年4月11號,8點42分執行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

啟動一個beat:celery beat -A celery_task -l info

注意:同一個目錄下只能開啟一個beat

啟動work執行:celery worker -A celery_task -l info -P eventlet

回到頂部

6.Django中使用Celery

提示:django-celery模組相容不太友好建議不使用,可以直接用多工結構,在任何python框架中都能使用

安裝包

celery==3.1.25
django-celery==3.1.20

在專案目錄下建立celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設定併發worker數量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個worker最多執行100個任務被銷燬,可以防止記憶體洩漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時時間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下建立tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

檢視函式views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 預設用utc時間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'