Django實現celery定時任務過程解析
阿新 • • 發佈:2020-04-22
1.首先在專案同名目錄下建一個celery.py
from __future__ import absolute_import import os from celery import Celery from datetime import timedelta from kombu import Queue # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE','OpsManage.settings') from django.conf import settings app = Celery('OpsManage') # Using a string here means the worker will not have to # pickle the object when using Windows. # 配置celery class Config: BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERY_ANNOTATIONS = {'*': {'rate_limit': '500/s'}} CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' app.config_from_object(Config) # 到各個APP裡自動發現tasks.py檔案 app.autodiscover_tasks() #crontab config app.conf.update( CELERYBEAT_SCHEDULE = { # 每隔30s執行一次函式 'every-30-min-add': { 'task': 'apps.tasks.celery_assets.push_host_by_salt_tasks','schedule': timedelta(seconds=30) # # 每天凌晨12點 # 'schedule': crontab(minute=0,hour=0) },},) # kombu : Celery 自帶的用來收發訊息的庫,提供了符合 Python 語言習慣的,使用 AMQP 協議的高階介面 Queue('transient',routing_key='transient',delivery_mode=1)
2.在settings.py裡配置celery
INSTALLED_APPS = [ ...... 'django_celery_beat','django_celery_results',]
3.在專案同名目錄下的__init__.py檔案裡申明celery任務,記得要去檢測呀
# coding:utf-8 from __future__ import absolute_import,unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from celery import app as celery_app __all__ = ['celery_app'] import pymysql pymysql.install_as_MySQLdb()
4.在task.py裡執行任務的函式上加@
from celery import task # 定時任務 @task def push_host_by_salt_tasks(): “”“balabala”“” return '這裡是定時任務'
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。