1. 程式人生 > >Celery學習筆記(一)

Celery學習筆記(一)

開始使用Celery

使用celery包含三個方面:1. 定義任務函式。2. 執行celery服務。3. 客戶應用程式的呼叫

1.使用 python 虛擬環境 模擬兩個不同的 主機。

在這裡插入圖片描述 在這裡插入圖片描述

2.建立一個檔案 tasks.py輸入下列程式碼:
from celery import Celery 
broker = 'redis://127.0.0.1:6379/5'
backend = 'redis://127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)

@app.task
def add(x, y):
    return x + y

上述程式碼匯入了celery,然後建立了celery 例項 app,例項化的過程中指定了任務名tasks(和檔名一致),傳入了broker和backend。然後建立了一個任務函式add。

3.啟動celery服務。在當前命令列終端執行(分別在 env1 和 env2 下執行):
celery -A tasks worker  --loglevel=info  

此時可以看到輸出,其中包括註冊的任務。 在這裡插入圖片描述

客戶端互動式除錯

在當前tasks.py檔案下進入python3環境

In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>
 
In [3]: r = add.delay(3, 3)
 
In [4]: r.re
r.ready   r.result  r.revoke
 
In [4]: r.ready()
Out[4]: True
 
In [6]: r.result
Out[6]: 6
 
In [7]: r.get()
Out[7]: 6

呼叫 delay 函式即可啟動 add 這個任務。這個函式的效果是傳送一條訊息到broker中去,這個訊息包括要執行的函式、函式的引數以及其他資訊,具體的可以看 Celery官方文件。這個時候 worker 會等待 broker 中的訊息,一旦收到訊息就會立刻執行訊息。 在這裡插入圖片描述 現在是在python環境中呼叫的add函式,實際上通常在應用程式中呼叫這個方法。

注意:如果把返回值賦值給一個變數,那麼原來的應用程式也會被阻塞,需要等待非同步任務返回的結果。因此,實際使用中,不需要把結果賦值。

啟動了一個任務之後,可以看到之前啟動的worker已經開始執行任務了。

使用配置檔案

Celery 的配置比較多,可以在

官方配置文件 查詢每個配置項的含義。

上述的使用是簡單的配置,下面介紹一個更健壯的方式來使用celery。首先建立一個python包,celery服務,姑且命名為proj。目錄檔案如下: 在這裡插入圖片描述

# -*- coding:utf-8 -*-

from __future__ import absolute_import
from celery import Celery

app = Celery('proj',include=['proj.tasks'])

app.config_from_object('proj.config')

if __name__ == '__main__':
    app.start()

此時並沒有指定broker和backend。

# -*- coding:utf-8 -*-

from __future__ import absolute_import

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
# -*- coding:utf-8 -*-

from __future__ import absolute_import
from proj.celery import app

@app.task
def add(x,y):
    return x + y

使用方法:在 proj 的同一級目錄執行 celery:

celery -A proj worker -l info

客戶端使用任務直接呼叫tasks檔案的函式即可

任務排程

新建一個proj1的python包 在這裡插入圖片描述

# -*- coding:utf-8 -*-

from __future__ import absolute_import
from celery import Celery

app = Celery('proj1',include=['proj1.tasks'])

app.config_from_object('proj1.celeryconfig')

if __name__ == '__main__':
    app.start()
# -*- coding:utf-8 -*-

from __future__ import absolute_import
from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def taskA(x,y):
    return x + y

@app.task
def taskB(x,y,z):
    return x + y + z

@app.task
def taskC(x,y):
    return x + y

上面的tasks.py中,首先定義了一個Celery物件,然後用celeryconfig.py對celery物件進行設定,之後再分別定義了三個task,分別是taskA,taskB和taskC。接下來看一下 celeryconfig.py 檔案

# -*- coding:utf-8 -*-

from __future__ import absolute_import
from kombu import Exchange, Queue

BROKER_URL = "redis://192.168.59.129:6379/0"
CELERY_RESULT_BACKEND = "redis://192.168.59.129:6379/0"

# 任務佇列分配
CELERY_QUEUES = (
     Queue('default',Exchange("default"),routing_key="default"),
     Queue('for_task_A',Exchange('for_task_A'),routing_key='task_a'),
     Queue('for_task_B',Exchange('for_task_B'),routing_key='task_b')
)

CELERY_ROUTES = {
    'tasks.taskA': {"queue": "for_task_A", "routing_key": "task_a"},
    'tasks.taskB': {"queue": "for_task_B", "routing_key": "task_b"}
}

celeryconfig.py 檔案中,首先設定了brokel以及result_backend,接下來定義了三個Message Queue,並且指明瞭Queue對應的Exchange(當使用Redis作為broker時,Exchange的名字必須和Queue的名字一樣)以及routing_key的值。

現在在一臺主機上面啟動一個worker,這個worker只執行for_task_A佇列中的訊息,這是通過在啟動worker時使用-Q Queue_Name引數指定的。

celery -A tasks worker -l info -n worker.%h -Q for_task_A

然後在另一臺主機中,切換路徑到工程目錄下,執行以下程式碼來啟動taskA:

from tasks improt *
task_A_re = taskA.delay(100,200)

此時task_A訊息會立馬傳送到for_task_A佇列中。剛開啟任務A的主機立即執行任務。

重複上面的過程,在另外一臺機器上啟動一個worker專門執行for_task_B中的任務。修改上一步驟的程式碼,把 taskA 改成 taskB 並執行。 而在 celeryconfig.py 檔案中還定義了個預設路由default,沒有指定任務route到哪個queue中執行或者指定的route沒配置有,執行任務的時候都會自動分配到default佇列中。

Scheduler(定時任務,週期性任務)

現實中我們經常會遇到定時任務的需求。而在celery中實現定時任務也很簡單。只需要設定celery物件的CELERYBEAT_SCHEDULE屬性即可。

# -*- coding:utf-8 -*-

from __future__ import absolute_import
from kombu import Exchange, Queue

BROKER_URL = "redis://192.168.59.129:6379/0"
CELERY_RESULT_BACKEND = "redis://192.168.59.129:6379/0"

# 任務佇列分配
CELERY_QUEUES = (
     Queue('default',Exchange("default"),routing_key="default"),
     Queue('for_task_A',Exchange('for_task_A'),routing_key='task_a'),
     Queue('for_task_B',Exchange('for_task_B'),routing_key='task_b')
)

CELERY_ROUTES = {
    'tasks.taskA': {"queue": "for_task_A", "routing_key": "task_a"},
    'tasks.taskB': {"queue": "for_task_B", "routing_key": "task_b"}
}

# Scheduler (定時任務,週期性任務)

CELERY_TIMEZONE = 'Asia/Shanghai'

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'taskA_schedule':{
        'task': 'proj1.tasks.taskA',
        'schedule': timedelta(seconds=5),
        'args' : (100,100)
    },
    'taskB_schedule':{
            'task': 'proj1.tasks.taskB',
            'schedule': timedelta(seconds=10),
            'args' : (100,200,300)
        },
    'taskC_schedule':{
            'task': 'proj1.tasks.taskC',
            'schedule': timedelta(seconds=15),
            'args' : (200,200)
        }
}

定義了3個定時任務,即每隔5s執行taskA任務,引數為(100,100),每隔10s執行taskB任務,引數為(100,200,300),每隔15s執行taskC任務,引數為(200,200).切換到工程同級目錄下,通過下列命令啟動定時任務: celery -A proj worker -B -l info。使用 beat 引數即可啟動定時任務。 在這裡插入圖片描述

在這裡插入圖片描述

crontab定時

celery也能通過crontab模式來設定定時。修改celeryconfig檔案

# from datetime import timedelta
# 
# CELERYBEAT_SCHEDULE = {
#     'taskA_schedule':{
#         'task': 'proj1.tasks.taskA',
#         'schedule': timedelta(seconds=0.5),
#         'args' : (100,100)
#     },
#     'taskB_schedule':{
#             'task': 'proj1.tasks.taskB',
#             'schedule': timedelta(seconds=0.5),
#             'args' : (100,200,300)
#         },
#     'taskC_schedule':{
#             'task': 'proj1.tasks.taskC',
#             'schedule': timedelta(seconds=0.5),
#             'args' : (200,200)
#         }
# }

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 7:30 A.M
    'add-every-monday-morning': {
        'task': 'tasks.tasksA',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Celery監控和管理以及命令幫助