Python Django +Celery +flower
準備工作
1.建立django專案,新增應用到setting檔案
2.pip安裝celery + eventlet + flower
3.檔案目錄如下:
4.檔案配置如下
celery_app目錄下:
# -*- coding: utf-8 -*- from celery import Celery app = Celery('demo')# 建立 Celery 例項 app.config_from_object('celery_app.celeryconfig')# 通過 Celery 例項載入配置模組__init__.py檔案
BROKER_URL = 'redis://xxx.xxx.xxx.xxx:6379celeryconfig.py'# 指定 Broker CELERY_RESULT_BACKEND = 'redis://xxx.xxx.xxx.xxx:6379/0'# 指定 Backend CELERY_TIMEZONE='Asia/Shanghai'# 指定時區,預設是 UTC # CELERY_TIMEZONE='UTC' CELERY_IMPORTS = ( # 指定匯入的任務模組 'celery_app.task1' )
from celery_app import app @app.task() def add(a, b):task1.pyreturn 'hello world: %i' % (a + b)
Django_Celery目錄下:
from django.contrib import admin from django.urls import path from djocelery.views import test,getresult urlpatterns = [ path('admin/', admin.site.urls), path('test/',test.as_view()), path('getresult/',getresult.as_view()) ]urls.py檔案
djocelery目錄下:
from django.shortcuts import HttpResponse from django.views import View from celery_app.task1 import add # Create your views here. res = {} class test(View): def get(self,request,*args,**kwargs): a,b = 1,2 result = add.apply_async((a, b)) res[str(result.id)] = result return HttpResponse(result.id) def post(self,request,*args,**kwargs): return HttpResponse("POST") def put(self,request,*args,**kwargs): return HttpResponse("PUT") def delete(self,request,*args,**kwargs): return HttpResponse("DELETE") class getresult(View): def get(self,request,*args,**kwargs): id = str(request.GET.get('id')) print(id) t1 = res[id].get() print(t1) return HttpResponse(t1)views.py檔案
快速開始
1.python manage.py runserver 8080 (啟動django)
2.celery -A celery_app worker --loglevel=info -P eventlet(啟動eventlet) 注意此條命令的執行位置,在celery_app上一級目錄下執行
3.celery flower --broker=redis://xxx.xxx.xxx.xxx:6379/0 (啟動視覺化監控)
4.在瀏覽器訪問http://127.0.0.1:8080/test/,提交任務。
5.檢視celery shell視窗,如下:
6.訪問http://127.0.0.1:5555/tasks
Flower使用介紹
原文連結:https://www.jianshu.com/p/4a408657ef76
官方文件:https://flower-docs-cn.readthedocs.io/zh/latest/
Dashboard
Dashboard 頁面是展示非同步任務佇列的主要情況,該頁面包括如下幾種狀態的任務:
- Active: 表示worker從佇列中獲取到任務,且正在執行的任務數
- Processed: 表示worker從佇列中獲取到任務的總資料量
- Failed: 表示worker從佇列中獲取到任務,且執行失敗了的(異常退出)
- Successed: 表示worker從佇列中獲取到任務,且執行成功了的
- Retried: 表示worker從佇列中獲取到任務,因為一些其他原因重新執行的數量
所以,上述這些數量的關係如下:
Processed = Active + Received + Failed + Successed + Retried
其中 Received 表示該任務分配到了worker中,但是還沒有被處理的任務數量
Worker Name 表示的是執行celery任務的worker名稱;
Status 表示的是該worker的狀態,包括 Online (線上) 、 Offline(離線),重啟flower程序,即可將Offline狀態的worker剔除掉;
Active / Processed / Failed / Successed / Retried 分別表示該worker正在執行的任務數、該worker處理的總任務數、處理失敗的任務數、處理成功的任務數、重試的任務數;
Load Average 表示系統在 1min / 5min / 15min 內的CPU平均負載(百分比)
Tasks
Tasks 頁面是展示所有worker接收到的任務的處理情況。下面對該表格中的做一些介紹
Tasks頁面- Name: 表示該任務的名稱,預設規則為該函式的路徑規則,例如
{模組名}.{檔名}.{函式名}
- UUID: 表示一個唯一字串ID用於表示該任務
- State: 表示該任務的狀態,包括: SUCCESS / FAILURE / STARTED / RECEIVED
- SUCCESS 表示該任務執行完畢且成功
- FAILURE 表示該任務執行失敗
- STARTED 表示該任務正在執行
- RECEIVED 表示該任務在worker中,只是被接收而已
- args: 表示該任務的列表引數
- kwargs: 表示該任務的字典引數
- Result: 表示該任務函式的返回結果
- Received: 表示該任務在worker接收到的時間
- Started: 表示該任務在worker開始執行的時間
- Runtime: 表示該任務在worker真正執行的耗時(單位:秒)
- Worker: 表示該任務所在的worker名稱
Broker
Broker 頁面展示的是celery連線訊息佇列的資訊,包括訊息佇列的訪問URL,下面的截圖展示的是連結的RabbitMQ,當然也可以連結Redis等。
Broker頁面- Name: 表示佇列的名稱
- Messages: 表示該佇列的訊息數量
- Unacked: 表示該佇列中還沒收到ack確認的訊息數量(該訊息可能處於 RECEIVED 或是 STARTED)
- Ready: 表示該佇列中還未分配到worker的訊息數量
- Consumers: 表示消費者數量(即worker數量)
- Idle since: 表示該佇列空閒的最初時間,否則為 N/A
Monitor
Monitor 頁面展示的是celery後臺任務的曲線展示狀況。