1. 程式人生 > 程式設計 >python基於celery實現非同步任務週期任務定時任務

python基於celery實現非同步任務週期任務定時任務

這篇文章主要介紹了python基於celery實現非同步任務週期任務定時任務,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

hello,小夥伴們,好久不更新了,這一次帶來的是celery在python中的應用以及設定非同步任務週期任務和定時任務的步驟,希望能給入坑的你帶來些許幫助.

首先是對celery的介紹,Celery其實是一個專注於實時處理和排程任務的分散式任務佇列,同時提供操作和維護分散式系統所需要的全部資料,因此可以用它提供的介面快速實現並管理一個分散式的任務佇列,它本身不是任務佇列,它是封裝了操作常見任務佇列的各種操作,可以使用它快速進行任務佇列的使用與管理.在Python中的組成部分是 1.使用者任務 app 2.管道 broker 用於儲存任務 官方推薦的是 redis rabbitMQ / backend 用於儲存任務執行結果的 3,員工 worker 大致流程入下:

最左邊的是使用者,使用者發起1個請求給伺服器,要伺服器執行10個任務,將這10個任務分給10個排程器,即開啟10個執行緒進行任務處理,worker會一直監聽排程器是否有任務,一旦發現有新的任務,就會立即執行新任務,一旦執行完就會返回給排程器,即backend,backend會將請求傳送給伺服器,伺服器將結果返回給使用者,表現的結果就是,這10個任務同時完成,同時返回,這就是Celery的整個工作流程,其中的角色分別為,任務(app_work),排程器(broker + backend),將任務快取的部分,即將所有任務暫時存在的地方,相當於生產者,消費者(worker 可以指定數量,即在建立worker命令的時候可以指定數量),在worker拿到任務後,人就控制不了了,除非把worker殺死,不然肯定會執行完.

也即 任務來了以後,排程器(broker)去快取任務,worker去執行任務,完成後返回backend,接著返回,

還有就是關於定時任務和週期任務在linux上為什麼不用自身所帶著的去做,是因為linux週期定時任務是不可控的,不好管理,返回值儲存也是個麻煩事,而celery只要開啟著排程器,就可以隨時把人物結果獲取到,即使用celery控制起來是非常方便的.

接下來就是例項程式碼:

workers.py

from celery import Celery
import time
# 建立一個Celery例項,就是使用者的應用app 第一個引數是任務名稱,可以隨意起 後面的就是配置的broker和backend
diaoduqi= Celery("mytask",broker="redis://127.0.0.1:6379",backend="redis:127.0.0.1:6379")
# 接下來是為應用建立任務 ab
@diaoduqi.task
def ab(a,b):
  time.sleep(15)
  return a+b

brokers.py

from worker import ab

# 將任務交給Celery的Worker執行
res = ab.delay(2,4)

#返回任務ID
print(res.id)

backends.py

from celery.result import AsyncResult
from worker import diaoduqi

# 非同步獲取任務返回值
async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi)

# 判斷非同步任務是否執行成功
if async_task.successful():
  #獲取非同步任務的返回值
  result = async_task.get()
  print(result)
else:
  print("任務還未執行完成")

為了方便,現在直接將三個檔案代表的部分命名在檔名稱中.首先是啟動workers.py

啟動方式是依據系統的不同來啟動的,對於linux下 celery worker -A workers -l INFO 也可以指定開啟的worker數量 即在後面新增的引數是 -c 5 表示指定5個worker 理論上指定的worker是無上限的,

在windows下需要安裝一個eventlet模組進行執行,不然不會執行成功 pip install eventlet 可以開啟執行緒 不指定數量是預設6個worker,理論上worker的數量可以開啟無限個,但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 開啟5個worker 執行

該命令後 處於就緒狀態,需要釋出任務,即brokers.py進行任務釋出,方法是使用delay的方式執行非同步任務,返回了一個任務id,接著去backends.py中取這個任務id,去查詢任務是否完成,判定條件即任務.successful 判斷是否執行完,上面就是celery非同步執行任務的用法與解釋

接下來就是celery在專案中的應用

在實際專案中應用celery是有一定規則的,即目錄結構應該如下.

結構說明 首先是建立一個CeleryTask的包,接著是在裡面建立一個celery.py,必須是這個檔案 關於重名的問題,找尋模組的順序是先從當前目錄中去尋找,根本找不到,接著是從內建模組中去找,根本就找不到寫的這個celery這個檔案,

celery.py

from celery import Celery
DDQ = Celery("DDQ",backend="redis://127.0.0.1:6379",include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])

TaskOne.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
  # time.sleep(3)
  return a+b
@DDQ.task
def one2():
  time.sleep(2)
  return "one2"

taskTwo.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
  time.sleep(2)
  return "two1"
@DDQ.task
def two2():
  time.sleep(3)
  return "two2"

getR.py

from CeleryTask.TaskOne import one1 as one

# one.delay(10,10)
# two.delay(20,20)

# 定時任務我們不在使用delay這個方法了,delay是立即交給task 去執行
# 現在我們使用apply_async定時執行

# 首先我們要先給task一個執行任務的時間
import datetime,time

# 獲取當前時間 此時間為東八區時間
ctime = time.time()
# 將當前的東八區時間改為 UTC時間 注意這裡一定是UTC時間,沒有其他說法
utc_time = datetime.datetime.utcfromtimestamp(ctime)
# 為當前時間增加 10 秒
add_time = datetime.timedelta(seconds=10)
action_time = utc_time + add_time

# action_time 就是當前時間未來10秒之後的時間
# 現在我們使用apply_async定時執行
res = one.apply_async(args=(6,10),eta=action_time)
res = one.apply_async(args=(6,eta=action_time)
print(res.id)
# 這樣原本延遲5秒執行的One函式現在就要在10秒鐘以後執行了

接著是在命令列cd到與CeleryTask同級目錄下,使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 這樣 就開啟了worker 接著去 釋出任務,在定時任務中不再使用delay這個方法了,

delay是立即交給ttask去執行,在這裡使用 apply_async定時執行 指的是排程的時候去定時執行

需要設定的是UTC時間,以及定時的時間(多長時間以後執行) 之後使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令開啟worker,之後執行 getR.py檔案釋出任務,可以看到在定義的時間以後執行該任務

週期任務

週期任務 指的是在指定時間去執行任務 需要匯入的一個模組有 crontab

檔案結構如下

結構同定時任務差不多,只不過需要變動一下檔案內容 GetR檔案已經不需要了,可以刪除.

celery.py

from celery import Celery
from celery.schedules import crontab

DDQ = Celery("DDQ","CeleryTask.TaskTwo"])

# 我要要對beat任務生產做一個配置,這個配置的意思就是每10秒執行一次Celery_task.task_one任務引數是(10,10)
DDQ.conf.beat_schedule = {
  "each10s_task": {
    "task": "CeleryTask.TaskOne.one1","schedule": 10,# 每10秒鐘執行一次
    "args": (10,10)
  },"each1m_task": {
    "task": "CeleryTask.TaskOne.one2","schedule": crontab(minute=1) # 每1分鐘執行一次 也可以替換成 60 即 "schedule": 60
  }
}

TaskOne.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def one1(a,b):
  # time.sleep(3)
  return a+b
@DDQ.task
def one2():
  time.sleep(2)
  return "one2"

taskTwo.py

import time
from CeleryTask.celery import DDQ
@DDQ.task
def two1():
  time.sleep(2)
  return "two1"
@DDQ.task
def two2():
  time.sleep(3)
  return "two2"

以上配置完成以後,這時候就不能直接建立worker了,因為要執行週期任務,需要首先有一個任務的生產方,即 celery beat -A CeleryTask,用來產生建立者,接著是建立worker worker的建立命令還是原來的命令,即 celery worker -A CeleryTask -l INFO -P eventlet -c 50,建立完worker之後,每10秒就會由beat建立一個任務給 worker去執行.至此,celery建立非同步任務,週期任務,定時任務完畢,夥伴們自己拿去測試吧.

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。