Celery分散式任務佇列框架--基於flask實現
阿新 • • 發佈:2019-01-08
使用Celery的方法
Celery是分散式的任務佇列特點: 簡單、靈活、高可用
1) 安裝Celery
pip install celery
2) 安裝 redis
redis可以使用list結構,提供訊息佇列的功能3)建立Celery物件並指定broker代理路徑
broker 格式: redis://[:[email protected]]host:port/db
app = Celery('tasks',
broker='redis://127.0.0.1:6379/10’)
app = Celery('tasks',
backend='redis://:[email protected]:6379/7’, # 返回值存入資料庫
broker='redis://:[email protected]:6379/8') # :密碼@host/post/db
@app.task
def sendMsg(recievers,html):
#注意 在celery的子任務中必須要嘗試獲取
with manage.app.test_request_context( ):
msg = Message(subject='tpp使用者啟用-v1.0',
recipients=[recievers],
sender='[email protected]')
#需要使用html,如果使用text可能會報錯
msg.html = html
ext.mail.send(msg)
print('郵件傳送成功’)
if __name__ == '__main__':
print('--批量下訂單--')
for i in range(20 ):
#向celery傳送任務,並獲取非同步結果物件
result:AsyncResult = goOrder.delay('XB99900888'+str(i))
#實時獲取結果(任務執行結果)
#result.get(timeout=1,interval=0.5,callback=orderCallback)
print(result.get_leaf( ))
print('--下訂單已完成--')
4)建立功能函式(完成的時間不確定),並將函式交給Celery後臺執行緒
@app.task
def goOrder(orderId):
# 下訂單的功能
time.sleep(20)
print('--下訂單成功--')
5) 執行經過@app.task裝飾的函式
格式: 函式名.delay(函式的引數)
goOrder.delay(100011) # 向celery傳送執行函式的訊號
6) 在window 會出現 ValueError: not enough values to unpack…
原因: Window系統 預設不支援Python的執行緒間的事件處理
解決: pip install eventlet
啟動: celery -A tasks worker –loglevel=info -P eventlet
例:
import time
from celery import Celery
from celery.bin import celery
from celery.result import AsyncResult
from flask_mail import Message
import manage
import ext
app = Celery('tasks',
backend='redis://:[email protected]:6379/7',
broker='redis://:[email protected]:6379/8') # :密碼@host/post/db
@app.task #交給Celery佇列去呼叫
def goOrder(order_id):
print('--goOrder--')
time.sleep(5)
print('完成{}的訂單'.format(order_id))
return '{} 確認完成'.format(order_id)
def orderCallback(id,value):
print(id,'----訂單完成----',value)
@app.task
def sendMsg(recievers,html):
#注意 在celery的子任務中必須要嘗試獲取
with manage.app.test_request_context():
msg = Message(subject='使用者啟用-v1.0',
recipients=[recievers],
sender='[email protected]')
#需要使用html,如果
msg.html = html
ext.mail.send(msg)
print('郵件傳送成功')
if __name__ == '__main__':
print('--批量下訂單--')
for i in range(20):
#向celery傳送任務,並獲取非同步結果物件
result:AsyncResult = goOrder.delay('XB99900888'+str(i))
#實時獲取結果(任務執行結果)
#result.get(timeout=1,interval=0.5,callback=orderCallback)
print(result.get_leaf())
print('--下訂單已完成--')
if __name__ == '__main1__':
celery.worker_main()