Django中使用Celery
阿新 • • 發佈:2018-12-11
模組:
Python 3.6.6
PyMySQL==0.8.1
Django==2.1.3
redis==3.0.1
celery==4.1.1
django-celery-beat ==1.1.1
django-celery-results==1.0.1
目錄結構
emgc ├── front │ ├── __init__.py │ ├── apps.py │ ├── migrations │ │ └── __init__.py │ ├── models.py │ ├── tasks.py │ └── views.py ├── manage.py ├── emgc │ ├── __init__.py │ ├── celery.py │ ├── settings.py │ ├── urls.py │ └── wsgi.py └── templates
配置使用
emgc/emgc/celery.py: from __future__ import absolute_import, unicode_literals import os from celery import Celery from emgc import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'emgc.settings') app = Celery('emgc') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) emgc/emgc/__init__.py: from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app'] emgc/emgc/settings.py import pymysql pymysql.install_as_MySQLdb() INSTALLED_APPS = [ ... 'django_celery_results', 'django_celery_beat', 'front', ] LANGUAGE_CODE = 'zh-Hans' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = True #如果USE_TZ設定為True時,Django會使用系統預設設定的時區,即America/Chicago,此時的TIME_ZONE不管有沒有設定都不起作用。 # celery for redis # 由於celery-4.1.0存在時區bug,必須使用UTC時區 CELERY_RESULT_BACKEND = 'redis://localhost:6379' # BACKEND配置,這裡使用redis # CELERY_RESULT_BACKEND = 'django-db' #使用django orm 作為結果儲存 CELERY_BROKER_URL = 'redis://localhost:6379' CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 使用資料庫來存放定時任務記錄 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' 進入專案的taskproj目錄啟動worker: celery -A tasks worker --pool=solo -l info (在windows下 不加--pool=solo 出現報錯ValueError: not enough values to unpack (expected 3, got 0))
定義任務
emgc/front/tasks.py:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
觸發任務
emgc/front/views.py: from django.http import JsonResponse from app01 import tasks # Create your views here. def index(request): x = int(request.GET.get("x")) y = int(request.GET.get("y")) res1=tasks.add.delay(x,y) res2=tasks.mul.delay(x,y) print("add:", x, y, res1.task_id) print("mul:", x, y, res2.task_id) return JsonResponse("success", safe=False)
從redis中獲取結果:
127.0.0.1:6379> get celery-task-meta-517829d8-ebe5-4be2-95d9-d750df717cc0 (517829d8-ebe5-4be2-95d9-d750df717cc0 為task_id)
使用django orm 作為結果儲存:
CELERY_RESULT_BACKEND = 'django-db' emgc/emgc/settings.py中配置
python3 manage.py migrate django_celery_results 生成表
(django_celery_results_taskresult)
model :
class TaskResult(models.Model):
"""Task result/status."""
task_id = models.CharField(_('task id'), max_length=255, unique=True)
task_name = models.CharField(_('task name'), null=True, max_length=255)
task_args = models.TextField(_('task arguments'), null=True)
task_kwargs = models.TextField(_('task kwargs'), null=True)
status = models.CharField(_('state'), max_length=50,
default=states.PENDING,
choices=TASK_STATE_CHOICES
)
content_type = models.CharField(_('content type'), max_length=128)
content_encoding = models.CharField(_('content encoding'), max_length=64)
result = models.TextField(null=True, default=None, editable=False)
date_done = models.DateTimeField(_('done at'), auto_now=True)
traceback = models.TextField(_('traceback'), blank=True, null=True)
hidden = models.BooleanField(editable=False, default=False, db_index=True)
meta = models.TextField(null=True, default=None, editable=False)
objects = managers.TaskResultManager()
class Meta:
"""Table information."""
ordering = ['-date_done']
verbose_name = _('task result')
verbose_name_plural = _('task results')
def as_dict(self):
return {
'task_id': self.task_id,
'task_name': self.task_name,
'task_args': self.task_args,
'task_kwargs': self.task_kwargs,
'status': self.status,
'result': self.result,
'date_done': self.date_done,
'traceback': self.traceback,
'meta': self.meta,
}
def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)