Win10下Celery4.2.1基於redis的部署與錯誤
阿新 • • 發佈:2018-11-12
Celery是一個分散式非同步任務的神器,由Python開發但是其通訊協議可以支援其它語言。它還可以設定定時任務,設定多個任務佇列並路由任務到指定的佇列;同時還提供了執行時的一些監控和管理介面。
安裝
- 安裝python3.7(官網下載直接安裝)
- 安裝celery庫(pip install celery)
- 安裝redis庫(pip install redis)
配置\啟動worker
安裝完成之後就可以配置celery的測試程式碼了。首先是clelry的執行配置檔案celeryconfig.py(當然你也可以從命令列傳入)
BROKER_URL = 'redis://pcma.xxx.com:6379/0' CELERY_RESULT_BACKEND = 'redis://pcma.xxx.com:6379/0' CELERY_REDIS_MAX_CONNECTIONS = 4 CELERYD_CONCURRENCY = 4 BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 5} # 5min CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] # Ignore other content CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True
接著建立celery的worker程式碼,celery_worker.py
from celery import Celery
app = Celery('celery_work')
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
然後我們就可以啟動worker服務了,此後該worker會一直等待任務並執行。具體啟動命令如下:
celery -A celery_server worker --loglevel=info
這種方式預設是多程序啟動worker。如果你希望使用單程序啟動的話,命令如下:
celery -A celery_server worker --loglevel=info -P solo
另外,你還可以使用協程的方式啟動。當然首先你需要安裝eventlet。(pip install eventlet)
celery -A celery_server worker --loglevel=info -P eventlet
任務呼叫
worker啟動就緒之後,我們要做的事情就是新增要執行的任務。具體程式碼如下:
from celery_worker import add
result = add.delay(4, 4)
如果你希望能獲取到執行的結果,如下介面可以滿足你的需求:
result.ready() # 獲取任務執行狀態
result.get(timeout=1, propagate=False) # 獲取任務執行結果
result.traceback # 獲取任務執行異常時的堆疊資訊
注意:如果你需要獲取任務的結果,那邊就需要配置CELERY_RESULT_BACKEND選項。否則celery不會儲存結果。
問題及解決
由於安裝的是celery4.2.1的版本。其中有2個bug需要針對性的解決。
第1個是因為async在Python3.7已經是關鍵字了,但是celery版本沒有更新導致的。下一個釋出版本中會修復。沒有新版本之前,我們只要修改celery檔案中的async為另外的字元即可。比如我修改為了async_2。(注意定義和引用需要修改全套的)
第2個是因為windows下沒有fork多程序的模組,而celery預設啟動方式就是多程序的,並且還用了fork的方式。解決方式有多種:
- 使用單程序啟動方式(上面已列出,另注意CELERYD_CONCURRENCY配置項取消)
- 在建立celery例項之前,配置系統環境變數。os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
- 使用協程的方式啟動(上面已列出)
問題描述
問題1:
File "celery/backends/redis.py", line 22
from . import async, base
^
SyntaxError: invalid syntax
問題2:
[2017-06-08 15:31:49,416: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
File "c:\program files\python36\lib\site-packages\billiard\pool.py", line 359, in workloop
result = (True, prepare_result(fun(*args, **kwargs)))
File "c:\program files\python36\lib\site-packages\celery\app\trace.py", line 518, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)