1. 程式人生 > 實用技巧 >Celery的任務分發與定時任務

Celery的任務分發與定時任務

目錄

celery應用場景

  • celery,處理任務的Python的模組。

    • 場景1:

      對【耗時的任務】,通過celery,將任務新增到broker(佇列),然後立即給使用者返回一個任務ID。
      當任務新增到broker之後,由worker去broker獲取任務並處理任務。
      任務完完成之後,再將結果放到backend中
      
      使用者想要檢查結果,提供任務ID,我們就可以去backend中去幫他查詢。
      
    • 場景2:

      定時任務(定時釋出/定時拍賣)
      

celery的使用

Celery是由Python開發的一個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支援實時處理,也支援任務排程。

支援多個broker和worker來實現高可用和分散式。

將一些耗時的任務 扔到broker佇列中,並且會返回一個任務ID,可以通過任務ID去backend佇列中獲取結果。 worker從broker獲取任務去執行,並將結果返回到backend佇列中。

函式名、引數 傳入broker

1.1 環境的搭建

pip3 install celery==4.4
安裝broker: redis或rabbitMQ
pip3 install redis / pika

1.2 快速使用

  • s1.py

    from celery import Celery
    
    app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
    
    @app.task	# 說明這個函式可以被當作celery的任務
    def x1(x, y):
        return x + y
    
    @app.task
    def x2(x, y):
        return x - y
    
    
  • s2.py

    from s1 import x1
    
    # 立即告訴celery去建立並執行x1任務,並傳兩個引數
    result = x1.delay(4, 4)
    print(result.id)	# 任務ID
    
    
  • s3.py 獲取任務結果

    from celery.result import AsyncResult
    from s1 import app
    
    result_object = AsyncResult(id="任務ID", app=app)
    print(result_object.status)	# 任務狀態
    
    data = result_object.get()	# 獲取任務結果
    print(data)
    
任務超時限制

避免某些任務一直處於非正常的進行中狀態,阻塞佇列中的其他任務。應該為任務執行設定超時時間。如果任務超時未完成,則會將 Worker 殺死,並啟動新的 Worker 來替代。

@app.task(time_limit=1800)	# 可以設定任務超時限制

執行程式:

  1. 啟動redis

  2. 啟動worker

    # 首先要進入當前目錄
    celery worker -A s1 -l info
    
    # -A s1	找到專案
    # -l info	是列印日誌log,程式碼上線時不加info
    
    windows下會報一個錯:
    
    Traceback (most recent call last):
      File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop
        result = (True, prepare_result(fun(*args, **kwargs)))
      File "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\celery\app\trace.py", line 546, in _fast_trace_task
        tasks, accept, hostname = _loc
    ValueError: not enough values to unpack (expected 3, got 0)
    
    
    解決安裝:
    pip install eventlet
    
    celery worker -A s1 -l info -P eventlet
    
  3. 建立任務,放入broker

    python s2.py
    python s2.py
    
  4. 檢視任務狀態

    # 在s3.py填寫任務ID
    ptyhon s3.py 
    

    取消任務

    from s1 import app
    from celery.app.control import Control
    
    celery_control = Control(app=app) 
    celery_control.revoke(id, terminate=True) 
    

1.3 django中應用celery

在Django中用Django-celery。

# pip3 install django-celery (沒有用到,還是用的celery模組)

之後,需要按照django-celery的要求進行編寫程式碼。

  • 第一步:【專案/專案/settings.py 】新增配置

    CELERY_BROKER_URL = 'redis://192.168.16.85:6379'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379'
    CELERY_TASK_SERIALIZER = 'json'
    # CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24   # 任務過期時間
    

    Celery 配置引數彙總

    配置項 說明
    CELERY_DEFAULT_QUEUE 預設佇列
    CELERY_BROKER_URL Broker 地址
    CELERY_RESULT_BACKEND 結果儲存地址
    CELERY_TASK_SERIALIZER 任務序列化方式
    CELERY_RESULT_SERIALIZER 任務執行結果序列化方式
    CELERY_TASK_RESULT_EXPIRES 任務過期時間
    CELERY_ACCEPT_CONTENT 指定任務接受的內容型別(序列化)
  • 第二步:【專案/專案/celery.py】在專案同名目錄建立 celery.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    
    # celery指定的配置檔案 
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
    
    app = Celery('demos')	# 名字隨便起,省略了broker配置,配置檔案已配置
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    # 所有的celery配置都以CELERY開頭
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 去每個已註冊app中讀取 tasks.py 檔案
    app.autodiscover_tasks()
    
  • 第三步,【專案/app名稱/tasks.py】

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
    
  • 第四步,【專案/專案/__init__.py

    from .celery import app as celery_app
    
    __all__ = ('celery_app',)	
    
  • 啟動worker

    # 首先進入專案目錄
    
    celery worker -A demos -l info -P eventlet
    
  • 編寫檢視函式,呼叫celery去建立任務。

    • url

      # from api.views import task
      
      url(r'^create/task/$', task.create_task),
      url(r'^get/result/$', task.get_result),
      
    • 檢視函式

      from django.shortcuts import HttpResponse
      from api.tasks import x1
      from celery.result import AsyncResult
      
      # from demos.celery import app
      from demos import celery_app
      
      def create_task(request):
          print('請求來了')
          result = x1.delay(2,2)	# 新增x1任務,並返回任務ID
          print('執行完畢')
          return HttpResponse(result.id) 
      
      
      def get_result(request):
          nid = request.GET.get('nid')  
          result_object = AsyncResult(id=nid, app=celery_app)
          # print(result_object.status)
          data = result_object.get()	# 獲取資料
          return HttpResponse(data)
      
  • 啟動django程式

    python manage.py ....
    

1.4 celery定時執行

from app01 import tasks
from celery.result import AsyncResult

def time_task(request):
    """
    定時執行
    :param request:
    :return:
    """
    # 獲取本地時間
    ctime = datetime.datetime.now()
    # 轉換成utc時間
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

    s10 = datetime.timedelta(seconds=60)    # 60s後執行
    ctime_x = utc_ctime + s10   # 執行的時間

    # 使用apply_async並設定時間
    result = tasks.mul.apply_async(args=[2, 5], eta=ctime_x)
    return HttpResponse(result.id)


def time_result(request):
    nid = request.GET.get('nid')
    from celery.result import AsyncResult
    # from demos.celery import app
    from celerytest import celery_app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status)  # 獲取狀態
    # data = result_object.get()  # 獲取資料
    # result_object.forget()  # 把資料在backend中移除
    # result_object.revoke(terminate=True)  # 取消任務terminate=True強制取消
	
    # 通過狀態絕對返回方式
    if result_object.successful():
        # 成功
        data = result_object.get()
        result_object.forget()
    elif result_object.failed():
        # 失敗
        data = '執行失敗!'
    else:
        data = '執行中!'
    return HttpResponse(data)

支援的引數 :

  • countdown : 等待一段時間再執行.

    tasks.add.apply_async((2,3), countdown=5)
    
  • eta : 定義任務的開始時間.

    tasks.add.apply_async((2,3), eta=now+tiedelta(second=10))
    
  • expires : 設定超時時間.

    tasks.add.apply_async((2,3), expires=60)
    
  • retry : 定時如果任務失敗後, 是否重試.

    tasks.add.apply_async((2,3), retry=False)
    
  • retry_policy : 重試策略.

    • max_retries : 最大重試次數, 預設為 3 次.
    • interval_start : 重試等待的時間間隔秒數, 預設為 0 , 表示直接重試不等待.
    • interval_step : 每次重試讓重試間隔增加的秒數, 可以是數字或浮點數, 預設為 0.2
    • interval_max : 重試間隔最大的秒數, 即 通過 interval_step 增大到多少秒之後, 就不在增加了, 可以是數字或者浮點數, 預設為 0.2 .

1.5 週期性定時任務

  • celery
  • django中也可以結合使用

task與shared_task裝飾器的區別:

task是通過建立的Celery物件進行呼叫
例如:
    app1 = Celery('tasks', broker='redis://192.168.16.48:6379',)
    app2 = Celery('tasks', broker='redis://192.168.16.48:6379',)
    
    @app1.task
    def x1(x, y):
        return x - y
        
    @app2.task
    def x2(x, y):
        return x * y
    
    
多用於單一檔案中,不用直接載入到記憶體。當有多個Celery物件時,任務函式可以明確使用某一個來裝飾。

shared_task多用與多個檔案使用celery,一般在celery.py中只建立一個Celery物件,例如Django整合celery。在專案啟動時,會將celery物件載入到記憶體,而@shared_task會自動將寫在各個應用下task.py的任務交與記憶體中的Celery物件,複用性較強。
例如:
    @shared_task
    def x1(x, y):
        return x - y
        
	from web import tasks
	tasks.x1.delay(1,5)

但當在celery.py中建立多個Celery物件時,不同任務使用不同的物件,這個時候就需要指明物件名。
例如:
	from web import tasks
	
	app1.tasks.x1.delay(1,5)
	app2.tasks.x2.delay(1,5)

1.6 任務繫結,記錄日誌,重試

# 修改 tasks.py 檔案.
 
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
 
@app.task(bind=True)
def div(self, x, y):
    logger.info(('Executing task id {0.id}, args: {0.args!r}'
                 'kwargs: {0.kwargs!r}').format(self.request))
    try:
        result = x/y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)     # 發生 ZeroDivisionError 錯誤時, 每 5s 重試一次, 最多重試 3 次.
 
    return result

當使用 bind=True 引數之後, 函式的引數發生變化, 多出了引數 self, 這這相當於把 div 程式設計了一個已繫結的方法, 通過 self 可以獲得任務的上下文.

1.7 啟用任務監控

Flower 是 Celery 官方推薦的實時監控工具,用於監控 Tasks 和 Workers 的執行狀態。Flower 提供了下列功能:

  • 檢視 Task 清單、歷史記錄、引數、開始時間、執行狀態等
  • 撤銷、終止任務
  • 檢視 Worker 清單、狀態
  • 遠端開啟、關閉、重啟 Worker 程序
  • 提供 HTTP API,方便整合到運維繫統

相比檢視日誌,Flower 的 Web 介面會顯得更加友好。

Flower 的 supervisor 管理配置檔案:

[program:flower]
command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 
directory=/opt/PyProjects/app
autostart=true
autorestart=true
startretries=3 
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50MB
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50MB
stderr_logfile_backups=3

celery面試總結

1. Celery是一個由python開發的一個簡單、靈活、可靠的,能夠處理大量任務的系統,可以做任務的分發,也能夠做定時任務。多用於耗時的操作。例如傳送簡訊、郵箱這些功能就能使用Celery做任務分發。
2. @shared_task/@task裝飾的函式說明是這一個celery任務,會新增broker中。
3. 函式名.delay(引數),會去呼叫且執行任務,並且返回任務ID。
4. 可以根據任務ID,去backend獲取任務狀態、結果。AsyncResult(id=任務ID, app=celery_app).get()獲取任務的結果;
5. apply_async(args=[引數],eta)設定定時執行任務,eta是定時任務的執行時間(utc時間)。
6. revoke()可以取消任務。