1. 程式人生 > >分散式任務佇列Celery的介紹

分散式任務佇列Celery的介紹

架構組成

一個完整的Celery分散式佇列架構應該包含一下幾個模組:

    1. 訊息中間人 Broker

訊息中間人,就是任務排程佇列,通常以獨立服務形式出現。它是一個生產者消費者模式,即主程式將任務放入佇列中,而後臺職程則會從佇列中取出任務並執行。任務可以按順序排程,也可以按計劃時間排程。Celery元件本身並不提供佇列服務,你需要整合第三方訊息中介軟體。Celery推薦的有RabbitMQ和Redis,另外也支援MongoDB、SQLAlchemy、Memcached等,但不推薦。

    1. 任務執行單元 Worker,也叫職程

即執行任務的程式,可以有多個併發。它實時監控訊息佇列,獲取佇列中排程的任務,並執行它。

    1. 執行結果儲存 Backend

由於任務的執行同主程式分開,如果主程式想獲取任務執行的結果,就必須通過中介軟體儲存。同訊息中間人一樣,儲存也可以使用RabbitMQ、Redis、MongoDB、SQLAlchemy、Memcached等,建議使用帶持久化功能的儲存中介軟體。另外,並非所有的任務執行都需要儲存結果,這個模組可以不配置。

完整的架構組成圖如下:celery

執行一個例子

讓我們跑一個例子,我們使用RabbitMQ作為中間人,Redis作為結果儲存。關於RabbitMQ和Redis的安裝,大家可以網上搜搜,這裡就不贅述了。本例假設RabbitMQ和Redis都安裝在本地機上。

首先我們要安裝Celery

當前最新的版本是4.0.0,我們可以通過PyPI安裝:

1 $pip install celery

為了支援Redis,你還需要安裝Celery對Redis的依賴:

1 $pip
install'celery[redis]'
然後,我們編寫任務程式碼tasks.py
Python
123456789 fromcelery importCeleryapp=Celery('tasks',broker='amqp://[email protected]//',backend='redis://localhost:6379/0')@app.taskdefadd(x,y):returnx+y
這裡我們建立了一個Celery例項app,名稱為’tasks’;中間人用RabbitMQ,URL為’amqp://[email protected]//’;儲存用Redis,URL為’redis://localhost:6379/0’。同時我們定義了一個Celery任務”add”,可以返回兩個引數的和。當函式使用”@app.task”修飾後,即為可被Celery排程的任務。
接下來,讓我們啟動後臺職程

職程會監聽訊息中間人佇列並等待任務排程,啟動命令為:

1 $celery worker-Atasks--loglevel=info--concurrency=5
    • 引數”-A”指定了Celery例項的位置,本例是在”tasks.py”中,celery命令會自動在該檔案中尋找Celery物件例項。不過我更建議你指定Celery物件名稱,如”-A tasks.app”。
    • 引數”loglevel”指定了日誌等級,也可以不加,預設為warning。
    • 引數”concurrency”指定最大併發數,預設為CPU核數。啟動成功後,你會看到如下資訊:worker-start

關於celery命令引數的更多資訊,你可以用下面的命令來查詢:

12 $celery help$celery worker--help

更詳細的資訊,就要去查閱官方文件

現在,讓我們發些任務出來吧

開啟python控制檯,輸入下面的指令:

123 >>>from tasks import add>>>add.delay(2,5)<AsyncResult:4c079d93-fd5f-47f0-8b93-c77a0112eb4e>

這個”delay()”方法會將任務傳送到訊息中間人佇列,並由之前啟動的後臺職程來執行。所以這時Python控制檯上只會返回”AsyncResult”資訊。如果你看下之前職程的啟動視窗,你會看到多了條日誌”Task tasks.add[4c079d93-fd5f-47f0-8b93-c77a0112eb4e] succeeded in 0.0211374238133s: 7″。說明”add”任務已經被排程並執行成功,並且返回7。

因為我們的程式配置了後臺結果儲存(backend),我們可以通過如下方法獲取任務執行後的返回值:

12345 >>>result=add.delay(2,5)>>>result.ready()True>>>result.get(timeout=1)7

此時我們就可以獲取到返回值7了,由於有些任務執行時間會很長,我們可以使用”result.ready()”方法來檢查任務是否執行完成。如果之前我們沒有配置backend儲存,那麼剛才的呼叫會拋異常。

關於後臺

上例中我們配置了Redis儲存,那讓我們去Redis裡看看Celery任務執行的結果是怎麼儲存的吧。通過”keys celery*”,可以查到所有屬於celery的鍵值,你會看到如下資訊。redis-keys
看來,celery是一個任務一條記錄啊,而且鍵值上帶著任務的UUID。讓我們檢視剛才執行的那條記錄的值吧,結果如下:

1 "{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 7, \"task_id\": \"4c079d93-fd5f-47f0-8b93-c77a0112eb4e\", \"children\": []}"

狀態,異常,返回值等都是通過JSon序列化存在Redis裡的,很好理解吧。

關於配置

Celery的引數配置,可以使用下面幾個方法來實現:

    • 單個引數配置
      Python
      12 app.conf.CELERY_BROKER_URL='amqp://[email protected]//'app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'
    • 多個引數配置
      Python
      1234 app.conf.update(CELERY_BROKER_URL='amqp://[email protected]//',CELERY_RESULT_BACKEND='redis://localhost:6379/0')
    • 從配置檔案中獲取

先將配置項放入配置檔案中,如”celeryconfig.py”

Python
12 BROKER_URL='amqp://[email protected]//'CELERY_RESULT_BACKEND='redis://localhost:6379/0'
 然後匯入到celery物件中:
Python
1 app.config_from_object('celeryconfig')

Celery的配置項相當之多,大家可以從官方文件中查詢。

delay()和apply_async()

我們之前呼叫任務使用了”delay()”方法,它其實是對”apply_async()”方法的封裝,使得你只要傳入任務所需的引數即可。對於特殊的任務排程需求,你需要使用”apply_async()”,其常用的引數有:

    • countdown: 指定多少秒後任務才被執行
    • eta: 指定任務被排程的時間,引數型別是datetime
    • expires: 任務過期時間,引數型別可以是int(秒),也可以是datetime
    • retry: 任務傳送失敗的重試次數
    • priority: 任務優先順序,範圍是0-9
    • serializer: 引數和返回值的序列化方式

詳細引數列表可以從官方文件中查詢。

關於序列化

當前版本的Celery預設序列化方式是”json”,對於剛才的例子,傳入的引數和返回值都是數值型別,用json序列化沒問題,但是對於有些物件來說,無法用JSon序列化,如果你是Python程式的話,可以使用”pickle”序列化,它支援所有Python型別物件。Celery支援的序列化方式還有”yaml”, “msgpack”。你可以在建立Celery物件時指定序列化方式,也可以通過配置指定,或者在使用apply_async()方法呼叫任務時指定:

Python
123456789101112 app=Celery('tasks',broker='...',task_serializer='yaml')app.conf.update(CELERY_TASK_SERIALIZER='pickle',CELERY_RESULT_SERIALIZER='json',)@app.taskdefadd(x,y):...add.apply_async((2,5),serializer='json')

關於併發

任務的併發預設採用多程序方式,Celery也支援gevent或者eventlet協程併發。方法是在啟動職程時使用”-P”引數:

1 $celery worker-Atasks--loglevel=info-Pgevent-c100

通過”-P gevent”我們就將併發改為了gevent方式了;”-c 100″同之前介紹的”concurrency”引數,指定了併發個數。對於gevent不熟悉的朋友,可以看看我之前的文章。另外預設多程序方式的引數值是”prefork”。

Flask同Celery的整合

本文的最後,我們演示一個將Flask應用同Celery整合的例子。最常見的任務就是發郵件,比如新使用者註冊,我們會發一個郵件來確認。由於發郵件是個IO阻塞式任務,我們可以將它交給Celery職程,而Flask應用可以繼續執行。

首先,我們寫個Flask應用,它會顯示一個表單,讓使用者填寫收件人和郵件內容,然後點擊發送按鈕來發郵件。

Python
123456789101112131415161718192021222324252627282930 fromflask importFlask,request,render_template,redirect,url_for,flashfromflask_mail importMail,Messageapp=Flask(__name__)app.config.update(SECRET_KEY='hard to guess string',MAIL_SERVER='smtp.example.com',MAIL_DEFAULT_SENDER='[email protected]',MAIL_USERNAME='bjhee',MAIL_PASSWORD='example')mail=Mail(app)@app.route('/',methods=['GET','POST'])defindex():ifrequest.method=='GET':returnrender_template('index.html')address=request.form['address']msg=Message('Hello Celery',recipients=[address])msg.body=request.form['content']mail.send(msg)flash('Sending email to %s'%address)returnredirect(url_for('index'))if__name__=='__main__':app.run(host='0.0.0.0',debug=True)

模版檔案”index.html”的內容如下:

Python
123456789101112131415161718192021 <!doctypehtml><html><head><title>TestCelery</title></head><body><h1>Sendmail</h1>{%withmessages=get_flashed_messages()%}{%ifmessages%}{%formessage inmessages%}<pstyle="color: green;">{{message}}</p>{%endfor%}{%endif%}{%endwith%}<form method="POST"><p>Address:<inputtype="text"name="address"></p><p>Content:<textarea name="content"></textarea></p><inputtype="submit"name="submit"value="Send"></form></body><