1. 程式人生 > >flask celery 使用方法

flask celery 使用方法

trie play DC -- ini oca 裝包 調用 www.

一、安裝

由於celery4.0不支持window,如果在window上安裝celery4.0將會出現下面的錯誤
技術分享圖片flask_clery

你現在只能安裝
pip install celery==3.1

二、安裝py for redis 模塊

pip install redis

三、安裝redis服務

網上很多文章都寫得模棱兩可,把人坑的不要不要的!!!

Redis對於Linux是官方支持的,但是不支持window,網上很多作者寫文章都不寫具體的系統環境,大多數直接說pip install redis就可以使用redis了,真的是坑人的玩意,本人深受其毒害

對於windows,如果你需要使用redis服務,那麽進入該地址下載

https://github.com/MSOpenTech/redis/releases

redis安裝包,雙擊完成就可以了

如果你在window上不安裝該redis包,將會提示

redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379.

或者

redis.exceptions.ConnectionError

需要註意是:安裝目錄不能安裝在C盤,否則會出現權限依賴錯誤

四、添加redis環境變量

D:\Program Files\Redis

五、初始化redis

進入redis安裝目錄,打開cmd運行命令redis-server.exe redis.windows.conf

,如果出錯

  • 雙擊目錄下的redis-cli.exe
  • 在出現的窗口中輸入shutdown
  • 繼續輸入exit

六、lask 集成celyer

在Flask配置中添加配置

1
2
3
 # Celery 配置
CELERY_BROKER_URL = ‘redis://localhost:6379/0‘ # broker是一個消息傳輸的中間件
CELERY_RESULT_BACKEND = ‘redis://localhost:6379/1‘ # 任務執行器
  • 在flask工程的__init__目錄下生產celery實例
    註意!!以下代碼必須在 flask app讀取完配置文件後編寫,否則會報錯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def make_celery(app):
celery = Celery(app.import_name, broker=app.config[‘CELERY_BROKER_URL‘],
backend=app.config[‘CELERY_RESULT_BACKEND‘])
celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
return celery


celery = make_celery(app)

完整示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
app = Flask(__name__)
app.config.from_object(config[‘default‘])

def make_celery(app):
celery = Celery(app.import_name, broker=app.config[‘CELERY_BROKER_URL‘],
backend=app.config[‘CELERY_RESULT_BACKEND‘])
celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
return celery


celery = make_celery(app)

在cmd中啟動celery服務

執行命令celery -A your_application.celery worker loglevel=info,your_application為你工程的名字,在這裏為 get_tieba_film

調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route(‘/‘)
@app.route(‘/index‘)
def index():
print ("耗時的任務")
# 任務已經交給異步處理了
result = get_film_content.apply_async(args=[1])
# 如果需要等待返回值,可以使用get()或wait()方法
# result.wait()
return ‘耗時的任務已經交給了celery‘


@celery.task()
def get_film_content(a):
util = SpiderRunUtil.SpiderRun(TieBaSpider.FilmSpider())
util.start()

綁定

一個綁定任務意味著任務函數的第一個參數總是任務實例本身(self),就像 Python 綁定方法類似:

1
2
3
@task(bind=True) 
def add(self, x, y):
logger.info(self.request.id)

任務繼承

任務裝飾器的 base 參數可以聲明任務的基類

1
2
3
4
5
6
7
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(‘{0!r} failed: {1!r}‘.format(task_id, exc))
@task(base=MyTask)
def add(x, y):
raise KeyError()

任務名稱

每個任務必須有不同的名稱。
如果沒有顯示提供名稱,任務裝飾器將會自動產生一個,產生的名稱會基於這些信息:
1)任務定義所在的模塊,
2)任務函數的名稱

顯示設置任務名稱的例子:

1
2
3
4
5
>>> @app.task(name=‘sum-of-two-numbers‘)
>>> def add(x, y):
... return x + y
>>> add.name
‘sum-of-two-numbers‘

最佳實踐是使用模塊名稱作為命名空間,這樣的話如果有一個同名任務函數定義在其他模塊也不會產生沖突。

1
2
3
>>> @app.task(name=‘tasks.add‘)
>>> def add(x, y):
... return x + y

七、安裝flower

將各個任務的執行情況、各個worker的健康狀態進行監控並以可視化的方式展現

1
pip install flower

啟動flower(默認會啟動一個webserver,端口為5555):

1
celery flower --address=127.0.0.1 --port=5555

進入http://localhost:5555即可查看。

doc

八、常見錯誤

1
ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0:

原因是:redis-server 沒有啟動
解決方案:到redis安裝目錄下執行redis-server.exe redis.windows.conf
檢查redis是否啟動:redis-cli ping

1
line 442, in on_task_received

解決:

1
2
3
4
5
6
7
8
9
10
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{‘timelimit‘: (None, None), ‘utc‘: True, ‘chord‘: None, ‘args‘: [4, 4], ‘retries‘: 0, ‘expires‘: None, ‘task‘: ‘main.add‘, ‘callbacks‘: None,
‘errbacks‘: None, ‘taskset‘: None, ‘kwargs‘: {}, ‘eta‘: None, ‘id‘: ‘97000322-93be-47e9-a082-4620e123dc5e‘} (210b)
Traceback (most recent call last):
File "d:\vm_env\flask_card\lib\site-packages\celery\worker\consumer.py", line 442, in on_task_received
strategies[name](message, body,
KeyError: ‘main.add‘

原因:任務沒有註冊或註冊不成功,只有在啟動的時候提示有任務的時候,才能使用該任務
技術分享圖片flask_celery
解決:

  1. 你在那個類中使用celery就在哪個類中執行celery -A 包名.類名.celery worker -l info
  2. 根據上一部提示的任務列表給任務設置對應的名稱
    如在Test中
1
2
3
4
5
6
from main import app, celery

@celery.task(name="main.Test.add".)
def add(x, y):
print "ddddsws"
return x + y

目錄結構:

1
2
3
4
5
6
+ Card  # 工程
+ main
+ admin
- Task.py
- __init__.py
- Test.py

則應該啟動的命令為:

1
celery -A main.Test.celery worker -l info

同時,如果你的Task.py也有任務,那麽你還應該重新創建一個cmd窗口執行

1
celery -A main.admin.Task.celery worker -l info

celery的工作進程可以創建多個
技術分享圖片flask_celery
技術分享圖片flask_celery

參考鏈接

轉自:https://www.laoyuyu.me/2018/02/10/python_flask_celery/

celery用戶指南,強烈推薦看

redis安裝

celery使用
https://redis.io/topics/quickstart

flask celery 使用方法