1. 程式人生 > 實用技巧 >celery 元件在django環境應用

celery 元件在django環境應用

第一步安裝

pip install celery==4.4

第二步 配置環境

# ############################# celery 配置連線redis #############################
#新增密碼的
CELERY_BROKER_URL = 'redis://:[email protected]:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://:[email protected]:6379/0'
CELERY_TASK_SERIALIZER = 'json'

第三步   【專案/專案/celery.py】在專案同名目錄建立 celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 拒絕隱式引入,因為celery.py的名字和celery的包名衝突,需要使用這條語句讓程式正確執行
from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'auction.settings')

app =Celery('auction')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')

# Load task modules from all registered Django app configs.
# 去每個已註冊app中讀取 tasks.py 檔案
app.autodiscover_tasks()

 第四步【專案/app名稱/tasks.py】 

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

第五步 【專案/專案/__init__.py 

from .celery import app as celery_app

__all__ = ('celery_app',)

第六步 啟動worker

進入專案目錄

celery worker -A s -l info -P eventle

 第七步 編寫檢視函式,呼叫celery去建立任務

from django.shortcuts import HttpResponse
from api.tasks import x1

def create_task(request):
    print('請求來了')
    result = x1.delay(2,2)
    print('執行完畢')
    return HttpResponse(result.id)


def get_result(request):
    nid = request.GET.get('nid')
    from celery.result import AsyncResult
    # from demos.celery import app
    from demos import celery_app
    result_object = AsyncResult(id=nid, app=celery_app)
    # print(result_object.status)
    data = result_object.get()
    return HttpResponse(data)

  第八步啟動django程式

http://127.0.0.1:8000/api/get/task/?nid=68d4c97e-92e5-42e4-a04d-c1a8f35fb4a
http://127.0.0.1:8000/api/create/task/

  注意事項:

本地時間和URF實際轉換:

Python3 : 
    # 本地時間轉換成utc時間
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    target_time = utc_ctime + datetime.timedelta(seconds=10)
    result = x1.apply_async(args=[11, 3], eta=target_time)
    
python2 :
    
    # “”“本地時間轉UTC時間(-8:00)”“”
     time_struct = time.mktime(ctime.timetuple())
     utc_st = datetime.datetime.utcfromtimestamp(time_struct)



     # utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
     target_time = utc_st + datetime.timedelta(seconds=30)