分散式任務佇列Celery的介紹
架構組成
一個完整的Celery分散式佇列架構應該包含一下幾個模組:
- 訊息中間人 Broker
訊息中間人,就是任務排程佇列,通常以獨立服務形式出現。它是一個生產者消費者模式,即主程式將任務放入佇列中,而後臺職程則會從佇列中取出任務並執行。任務可以按順序排程,也可以按計劃時間排程。Celery元件本身並不提供佇列服務,你需要整合第三方訊息中介軟體。Celery推薦的有RabbitMQ和Redis,另外也支援MongoDB、SQLAlchemy、Memcached等,但不推薦。
- 任務執行單元 Worker,也叫職程
即執行任務的程式,可以有多個併發。它實時監控訊息佇列,獲取佇列中排程的任務,並執行它。
- 執行結果儲存 Backend
由於任務的執行同主程式分開,如果主程式想獲取任務執行的結果,就必須通過中介軟體儲存。同訊息中間人一樣,儲存也可以使用RabbitMQ、Redis、MongoDB、SQLAlchemy、Memcached等,建議使用帶持久化功能的儲存中介軟體。另外,並非所有的任務執行都需要儲存結果,這個模組可以不配置。
執行一個例子
讓我們跑一個例子,我們使用RabbitMQ作為中間人,Redis作為結果儲存。關於RabbitMQ和Redis的安裝,大家可以網上搜搜,這裡就不贅述了。本例假設RabbitMQ和Redis都安裝在本地機上。
首先我們要安裝Celery
當前最新的版本是4.0.0,我們可以通過PyPI安裝:
1 | $pip install celery |
為了支援Redis,你還需要安裝Celery對Redis的依賴:
1 | $pip |
然後,我們編寫任務程式碼tasks.py
Python123456789 | fromcelery importCeleryapp=Celery('tasks',broker='amqp://[email protected]//',backend='redis://localhost:6379/0')@app.taskdefadd(x,y):returnx+y |
這裡我們建立了一個Celery例項app,名稱為’tasks’;中間人用RabbitMQ,URL為’amqp://[email protected]//’;儲存用Redis,URL為’redis://localhost:6379/0’。同時我們定義了一個Celery任務”add”,可以返回兩個引數的和。當函式使用”@app.task”修飾後,即為可被Celery排程的任務。
接下來,讓我們啟動後臺職程
職程會監聽訊息中間人佇列並等待任務排程,啟動命令為:
1 | $celery worker-Atasks--loglevel=info--concurrency=5 |
關於celery命令引數的更多資訊,你可以用下面的命令來查詢:
12 | $celery help$celery worker--help |
更詳細的資訊,就要去查閱官方文件了
現在,讓我們發些任務出來吧
開啟python控制檯,輸入下面的指令:
123 | >>>from tasks import add>>>add.delay(2,5)<AsyncResult:4c079d93-fd5f-47f0-8b93-c77a0112eb4e> |
這個”delay()”方法會將任務傳送到訊息中間人佇列,並由之前啟動的後臺職程來執行。所以這時Python控制檯上只會返回”AsyncResult”資訊。如果你看下之前職程的啟動視窗,你會看到多了條日誌”Task tasks.add[4c079d93-fd5f-47f0-8b93-c77a0112eb4e] succeeded in 0.0211374238133s: 7″。說明”add”任務已經被排程並執行成功,並且返回7。
因為我們的程式配置了後臺結果儲存(backend),我們可以通過如下方法獲取任務執行後的返回值:
12345 | >>>result=add.delay(2,5)>>>result.ready()True>>>result.get(timeout=1)7 |
此時我們就可以獲取到返回值7了,由於有些任務執行時間會很長,我們可以使用”result.ready()”方法來檢查任務是否執行完成。如果之前我們沒有配置backend儲存,那麼剛才的呼叫會拋異常。
關於後臺
上例中我們配置了Redis儲存,那讓我們去Redis裡看看Celery任務執行的結果是怎麼儲存的吧。通過”keys celery*”,可以查到所有屬於celery的鍵值,你會看到如下資訊。
看來,celery是一個任務一條記錄啊,而且鍵值上帶著任務的UUID。讓我們檢視剛才執行的那條記錄的值吧,結果如下:
1 | "{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 7, \"task_id\": \"4c079d93-fd5f-47f0-8b93-c77a0112eb4e\", \"children\": []}" |
狀態,異常,返回值等都是通過JSon序列化存在Redis裡的,很好理解吧。
關於配置
Celery的引數配置,可以使用下面幾個方法來實現:
- 單個引數配置
Python12 app.conf.CELERY_BROKER_URL='amqp://[email protected]//'app.conf.CELERY_RESULT_BACKEND='redis://localhost:6379/0'
- 多個引數配置
Python1234 app.conf.update(CELERY_BROKER_URL='amqp://[email protected]//',CELERY_RESULT_BACKEND='redis://localhost:6379/0')
- 從配置檔案中獲取
先將配置項放入配置檔案中,如”celeryconfig.py”
Python12 | BROKER_URL='amqp://[email protected]//'CELERY_RESULT_BACKEND='redis://localhost:6379/0' |
Python
1 | app.config_from_object('celeryconfig') |
Celery的配置項相當之多,大家可以從官方文件中查詢。
delay()和apply_async()
我們之前呼叫任務使用了”delay()”方法,它其實是對”apply_async()”方法的封裝,使得你只要傳入任務所需的引數即可。對於特殊的任務排程需求,你需要使用”apply_async()”,其常用的引數有:
- countdown: 指定多少秒後任務才被執行
- eta: 指定任務被排程的時間,引數型別是datetime
- expires: 任務過期時間,引數型別可以是int(秒),也可以是datetime
- retry: 任務傳送失敗的重試次數
- priority: 任務優先順序,範圍是0-9
- serializer: 引數和返回值的序列化方式
詳細引數列表可以從官方文件中查詢。
關於序列化
當前版本的Celery預設序列化方式是”json”,對於剛才的例子,傳入的引數和返回值都是數值型別,用json序列化沒問題,但是對於有些物件來說,無法用JSon序列化,如果你是Python程式的話,可以使用”pickle”序列化,它支援所有Python型別物件。Celery支援的序列化方式還有”yaml”, “msgpack”。你可以在建立Celery物件時指定序列化方式,也可以通過配置指定,或者在使用apply_async()方法呼叫任務時指定:
Python123456789101112 | app=Celery('tasks',broker='...',task_serializer='yaml')app.conf.update(CELERY_TASK_SERIALIZER='pickle',CELERY_RESULT_SERIALIZER='json',)@app.taskdefadd(x,y):...add.apply_async((2,5),serializer='json') |
關於併發
任務的併發預設採用多程序方式,Celery也支援gevent或者eventlet協程併發。方法是在啟動職程時使用”-P”引數:
1 | $celery worker-Atasks--loglevel=info-Pgevent-c100 |
通過”-P gevent”我們就將併發改為了gevent方式了;”-c 100″同之前介紹的”concurrency”引數,指定了併發個數。對於gevent不熟悉的朋友,可以看看我之前的文章。另外預設多程序方式的引數值是”prefork”。
Flask同Celery的整合
本文的最後,我們演示一個將Flask應用同Celery整合的例子。最常見的任務就是發郵件,比如新使用者註冊,我們會發一個郵件來確認。由於發郵件是個IO阻塞式任務,我們可以將它交給Celery職程,而Flask應用可以繼續執行。
首先,我們寫個Flask應用,它會顯示一個表單,讓使用者填寫收件人和郵件內容,然後點擊發送按鈕來發郵件。
Python123456789101112131415161718192021222324252627282930 | fromflask importFlask,request,render_template,redirect,url_for,flashfromflask_mail importMail,Messageapp=Flask(__name__)app.config.update(SECRET_KEY='hard to guess string',MAIL_SERVER='smtp.example.com',MAIL_DEFAULT_SENDER='[email protected]',MAIL_USERNAME='bjhee',MAIL_PASSWORD='example')mail=Mail(app)@app.route('/',methods=['GET','POST'])defindex():ifrequest.method=='GET':returnrender_template('index.html')address=request.form['address']msg=Message('Hello Celery',recipients=[address])msg.body=request.form['content']mail.send(msg)flash('Sending email to %s'%address)returnredirect(url_for('index'))if__name__=='__main__':app.run(host='0.0.0.0',debug=True) |
模版檔案”index.html”的內容如下:
Python123456789101112131415161718192021 | <!doctypehtml><html><head><title>TestCelery</title></head><body><h1>Sendmail</h1>{%withmessages=get_flashed_messages()%}{%ifmessages%}{%formessage inmessages%}<pstyle="color: green;">{{message}}</p>{%endfor%}{%endif%}{%endwith%}<form method="POST"><p>Address:<inputtype="text"name="address"></p><p>Content:<textarea name="content"></textarea></p><inputtype="submit"name="submit"value="Send"></form></body>< |