1. 程式人生 > 其它 >[Celery分散式的非同步任務框架]

[Celery分散式的非同步任務框架]

[Celery分散式的非同步任務框架]

Celery介紹

Celery:分散式的非同步任務框架 Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程

可以做的事,我們用它來解決什麼問題

  • 非同步任務 (非同步的執行這個任務函式)解決耗時任務,將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推送、音視訊處理等等
  • 延遲任務(延遲5s鍾,執行一個任務(函式))解決延遲任務
  • 定時任務(定時的執行任務(函式))如果單純執行定時任務,沒必要用celery,可以使用別的

平臺問題

Celery is a project with minimal funding, so we don’t support Microsoft Windows. 
Please don’t open any issues related to that platform
譯:(Celery 是一個資金很少的專案,所以我們不支援微軟的Windows。 請不要開啟任何與該平臺相關的問題 )

在Celery中幾個基本的概念,需要先了解下,不然不知道為什麼要安裝下面的東西。概念:Broker,Backend。

什麼是Broker:

  broker是一個訊息傳輸的中介軟體,可以理解為一個郵箱。每當應用程式呼叫celery的非同步任務的時候,會向broker傳遞訊息,而後celery的worker將會取到訊息,進行程式執行,好吧,這個郵箱可以看成是一個訊息佇列,其中Broker的中文意思是經紀人,其實就是一開始說的訊息佇列,用來發送和接受資訊。這個broker有幾個方案可供選擇:RabbitMQ(訊息佇列),Redis(快取資料庫),資料庫(不推薦),等等

什麼是backend?

   通常程式傳送的訊息,發完就完了,可能都不知道對方什麼時候接受了,為此,celery實現了一個backend,用於儲存這些訊息以及celery執行的一些訊息和結果,Backend是在Celery的配置中的一個配置項CELERY_RESULT_BACKEND,作用是儲存結果和狀態,如果你需要跟蹤任務的狀態,那麼需要設定這一項,可以是Database backend,也可以是Cache backend.

對於brokers,官方推薦是rabbitmq和redis,至於backend,就是資料庫,為了簡單可以都使用redis。

Celery非同步任務框架

"""
1)可以不依賴任何伺服器,通過自身命令,啟動服務(內部支援socket)
2)celery服務為為其他專案服務提供非同步解決任務需求的
注:會有兩個服務同時執行,一個是專案服務,一個是celery服務,專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求
​
人是一個獨立執行的服務 | 醫院也是一個獨立執行的服務
    正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
    人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求

"""

Celery的架構

Celery的架構由三部分組成:

  • 訊息中介軟體(message broker)
    • Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis等等
  • 任務執行單元(worker)
    • Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中
  • 任務執行結果儲存(task result store)
    • Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis等

Celery簡單使用

安裝配置,目錄組織結構

1 安裝:pip install celery
  訊息中介軟體:RabbitMQ/Redis    app=Celery('任務名',backend='xxx',broker='xxx')
    
2 兩種目錄組織結構
    -普通的
    -包管理的(提倡用包管理,結構更清晰)
# 普通的
# 如果 Celery物件:Celery(...) 是放在一個模組下的
# 1)終端切換到該模組所在資料夾位置:scripts
# 2)執行啟動worker的命令:celery worker -A 模組名 -l info -P eventlet
# 注:windows系統需要eventlet支援,Linux與MacOS直接執行:celery worker -A 模組名 -l info
# 注:模組名隨意
​
# 包管理的
# 如果 Celery物件:Celery(...) 是放在一個包下的
# 1)必須在這個包下建一個celery.py的檔案,將Celery(...)產生物件的語句放在該檔案中
# 2)執行啟動worker的命令:celery worker -A 包名 -l info -P eventlet
# 注:windows系統需要eventlet支援,Linux與MacOS直接執行:celery worker -A 模組名 -l info
# 注:包名隨意

celery執行非同步任務

基本使用

# 第一步:定義一個py檔案(名字隨意,我們叫celery_task)
from celery import Celery   # 匯入安裝的模組

backend = 'redis://127.0.0.1:6379/1' # 結果儲存,放在db1庫下
broker = 'redis://127.0.0.1:6379/2' # 訊息中介軟體,放在db2庫下
app = Celery(__name__,broker=broker, backend=backend) #__name__當前檔案的名字  celery例項化得到物件app
​
# 被它修飾,就變成了celery的任務
@app.task   
def add(a,b):   # 定義一個任務
    return a+b
​
​
#第二步:提交任務(新建一個py檔案:submit_task) 
from celery_task import add  # 在檔案中匯入我們建立的任務
# 非同步呼叫
# 只是把任務提交到了redis中,但是沒有執行,返回一個唯一標識,後期使用唯一標識去看任務執行結果
res=add.delay(33,41)   # add.delay 就是提交非同步任務
print(res)
​
​
# 使用任務執行單元來執行任務(使用命令啟動)
"""
啟動celery(app)服務:
# 非windows
 命令:celery  -A celery_task worker -l info
# windows:
 pip3 install eventlet  # 需要安裝一個檔案
 celery  -A celery_task worker -l info -P eventlet
"""
​
​
#第三步:啟動worker
# 需要去我們自己寫的celery檔案所在的目錄下執行
# celery_task py檔案的名字
# -l info日誌輸出級別是info 
# -P eventlet  在win平臺需要加上
celery -A celery_task worker -l info -P eventlet
#如果佇列裡有任務,就會執行,如果沒有任務,worker就等在這
​
​
​
# 第四步:查詢結果是否執行完成(新建一個py檔案,get_result)
from celery_task import app
from celery.result import AsyncResult
​
id = 'ed85b97a-a231-4d71-ba11-e5f6450d0b47'  
# 提交完任務會返回一個uuid 寫這就可以查詢該任務的狀態資訊

if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任務失敗')
    elif a.status == 'PENDING':   # 判斷status的值
        print('任務等待中被執行')
    elif a.status == 'RETRY':
        print('任務異常後正在重試')
    elif a.status == 'STARTED':
        print('任務已經開始被執行')
    
'''
查詢結果成功後返回到redis的資訊
{
  "status": "SUCCESS",   
   "result":  true,
   "traceback":  null,
   "children":  [],
   "date_done":  "2021-07-30T07:03:32.849985",
   "task_id":  "dbebc6e5-a700-4f91-b50c-7ef5b25c2cab"
}
'''
    
 """
流程:
 一個程式碼往裡提,提到一個訊息佇列裡,然後啟動起celry worker,
 如果有任務就執行,如果沒有任務,worker就等在這,執行了就在
 任務執行結果儲存裡,再用其他程式碼在結果儲存里根據id號查出來
 查的時候它有可能執行了有可能沒執行,沒有執行是因為程式碼提交
 了沒有啟動worker,在結果儲存裡也查不到,一但啟動worker立馬
 就會被執行再查就可以查到了
 
 """

包管理結構

1 包結構
 project
   ├── celery_task # celery包,預設都放在專案根路徑下
   │     ├── __init__.py # 包檔案
   │     ├── celery.py # celery連線和配置相關檔案,且名字必須叫celery.py
   │     ├── course_task.py  # 任務1
   │     ├── home_task.py  # 任務2
   │     └── user_task.py  # 任務3       
   ├── submit_task.py  # 提交和查詢結果的py檔案,可以在專案的任意目錄下
   └── delay_task.py  # 執行延遲任務,(注意:這個py名稱隨意)
​
celery_task:這個包就類似於一座醫院,是一個獨立執行的服務
submit_task.py :這個py檔案就類似於一個人,也是一個獨立執行的服務
​

2 啟動worker
先cd到F:\luffy\luffyapi\luffyapi\scripts目錄下,這個時候直接啟動celery_task這個包就行
但前提是celery_task包下必須有celery.py這個檔案

celery.py

from celery import Celery   # 匯入celery模組

backend = 'redis://127.0.0.1:6379/1'    # 任務執行完畢後儲存的位置
broker = 'redis://127.0.0.1:6379/2'
# 表示三個task檔案統一被app管理起來了
app = Celery(__name__, broker=broker, backend=backend,
             include=['celery_task.course_task', 'celery_task.user_task', 'celery_task.home_task'])

三個任務↓↓↓

home_task.py

# 計算加法任務
from .celery import app
@app.task
def add(a,b):
    import time
    time.sleep(10)
    return a+b

course_task.py

# 寫檔案任務
from .celery import app
@app.task
def wirte_file(s):
    with open('s1.txt','w',encoding='utf-8') as f:
        f.write(s)
​

user_task.py

# 計算乘法任務
from .celery import app
@app.task
def mul(a,b):
    return a*b

submit_task.py

# 在專案中任意位置使用
# 提交任務
from celery_task.user_task import mul
​
res=mul.delay(90,80)
print(res)
​
# 查詢結果   (可以將查詢結果的程式碼放入另一個py檔案下  單獨查詢)
from celery_task.celery import app
from celery.result import AsyncResult
​
id = 'bc893136-44aa-4bdf-8a58-8e639191d7c6'  # 提交任務後回返回一個uuid 寫入此處即可
if __name__ == '__main__':
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任務失敗')
    elif a.status == 'PENDING':
        print('任務等待中被執行')
    elif a.status == 'RETRY':
        print('任務異常後正在重試')
    elif a.status == 'STARTED':
        print('任務已經開始被執行')

在luffy專案app中測試

# 演示非同步在 views.py中
from celery_task.home_task import add
from celery.result import AsyncResult
from celery_task import app
​
def test(request):
    # 提交一個計算 90+80的任務,需要10秒中才能拿到結果  
    res=add.delay(90,80)
    return HttpResponse('任務已經提,任務id為:%s'%str(res))
​
def get_result_test(request):   # FBV
    id=request.GET.get('id')
​
    a = AsyncResult(id=id, app=app)
    if a.successful():
        result = a.get()
        print(result)
    elif a.failed():
        print('任務失敗')
    elif a.status == 'PENDING':
        print('任務等待中被執行')
    elif a.status == 'RETRY':
        print('任務異常後正在重試')
    elif a.status == 'STARTED':
        print('任務已經開始被執行')
​
    return HttpResponse('任務執行的結果是%s'%result)
​
​
# 路由urls.py
from django.urls import path
from . import views
urlpatterns = [
    path('test/', views.test),
    path('get_result_test/', views.get_result_test),
]
"""
做成非同步任務需要兩次操作
http://127.0.0.1:8000/user/test/  # 訪問路徑獲取id
http://127.0.0.1:8000/user/get_result_test/?id=獲取的id  # 可以拿到結果
"""

celery 執行延遲任務

# delay_task.py
# 其他不用變,提交任務的時候
from celery_task.user_task import mul
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10) # 當前utc時間,往後推10s,時間物件
​
'''  如果配置為上海時區則不需要指定datetime.utcnow()  直接datetime.now() 即可
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
'''


# print(eta)
# 提交延遲任務
# args:mul任務的引數
# eta:延遲時間(延遲多長時間執行),必須傳一個時間物件(寫數字不行)
# 使用utc時間
# 10s後執行這個任務
# 引數傳遞需要使用args,傳時間要使用時間物件,使用的是utc時間
mul.apply_async(args=(200, 50), eta=eta)
​
"""
先啟動worker在執行延遲任務,可以發現我們指定的是10秒中,
也就是說worker在沒有被佔用的情況下,可以在10秒後執行任務
"""

celery執行定時任務

#第一步:在celery.py中配置
​
# 修改時區配置
# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
​
# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    # 定時任務一,每隔3秒做一次
    'task-mul': {
        'task': 'celery_task.user_task.mul',
        'schedule': timedelta(seconds=3),  # 3s後
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
        'args': (3, 15),
    },
    # 定時任務二,每隔10秒做一次
    'task-add': {
        'task': 'celery_task.home_task.add',
        'schedule': timedelta(seconds=10),  # 10s後
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
        'args': (3, 5),
    },
}
​
#第二步:啟動beat(beat負責定時提交任務)
celery -A celery_task beat -l info
​
# 第三步:啟動worker,任務就會被worker執行了
celery -A celery_task worker -l info -P eventlet

定時更新(應用)

路飛專案首頁輪播圖定時更新(非同步更新思路)

1 首頁輪播圖加入快取了
2 雙寫一致性問題(mysql資料改了,redis快取資料還是一樣的)
3 幾種更新快取的思路
    -定時更新(具體介面,看公司需求),我們為了測試,快一些,3s更新一次
    -非同步更新(咱們現在沒有)
        -新增輪播圖介面---》新增完輪播圖後--》執行非同步更新輪播圖的任務--》update_banner.dely()
        
4 程式碼編寫

應用

####### 第1步:在celery.py中
from celery import Celery
# 在指令碼中匯入django環境   
# 因為celery和django是兩個獨立的服務 互不影響  所以在django中想用celery的話需要在指令碼中匯入django環境
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
django.setup()  
​
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,
             include=['celery_task.course_task', 'celery_task.user_task', 'luffyapi.apps.home.home_task'])
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
​

# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'update_banner': {
        'task': 'luffyapi.apps.home.home_task.update_banner',
        'schedule': timedelta(seconds=3),  # 3s後
    },
}


####### 第2步:在home的app下新建home_task.py 用於寫入更新快取的程式碼
from .celery import app  # 匯入celery.py下的app
@app.task
def update_banner():
    from home.models import Bannder
    from home.serializer import BannerSerializer
    from django.conf import settings
    from django.core.cache import cache
    # 更新輪播圖快取的任務
    #只要這個task被執行一次,快取中的輪播圖就是最新的
    queryset = Bannder.objects.all().filter(is_show=True, is_delete=False).order_by('-orders')[0:settings.BANNER_SIZE]
    ser = BannerSerializer(instance=queryset,many=True)
    # 小問題,圖片前面的路徑是沒有帶的,處理路徑
    for item in ser.data:
        item['image']= settings.REMOTE_ADDR+item['image']
​
    cache.set('banner_cache_list',ser.data)
    return True

# user_settings.py
# 後端專案地址:處理圖片路徑,專案上線可改配置
REMOTE_ADDR='http://127.0.0.1:8000'


####### 第3步:一定要讓django啟動在celery.py檔案中加入
    #在指令碼中匯入django環境
    import os
    import django
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
    django.setup()
    
    
####### 第4步 啟動django專案,啟動beat,啟動worker
	# 啟動worker
	celery -A celery_task worker -l info -P eventlet    
    # 啟動beat的命令(負責每隔幾秒鐘,向任務佇列中提交任務)
	celery beat -A celery_task -l info
    -每隔3s就會更新快取