1. 程式人生 > 程式設計 >Django中使用Celery的方法步驟

Django中使用Celery的方法步驟

(一)、概述

Celery是一個簡單、靈活和可靠的基於多工的分散式系統,為運營提供用於維護此係統的工具。專注於實時處理的任務佇列,同時也支援任務的排程。執行單元為任務(task),利用多執行緒這些任務可以被併發的在單個或多個職程(worker)上執行。

Celery通過訊息機制通訊,通常通過中間人(broker)來分配和調節客戶端與職程伺服器(worker)之間的通訊。客戶端傳送一條訊息,中間人把訊息分配給一個職程,最後由職程來負責執行此任務。

Celery可以有多個職程和中間人,這樣提高了高可用性和橫向的擴充套件能力

Celery由python語言開發,但是該協議可以用任何語言拉力實現,例如:Django中的Celery、node中的node-celery和php中的celery-php

(二)、Django中使用Celery的流程與配置

匯入Celery:pip3 install Celery

在 與專案同名的目錄下 建立celery.py檔案,特別注意:專案同名的目錄下

複製內容到該檔案

修改兩處內容

  • os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')中的proj改為專案名
  • app = Celery('pro')中的pro改為專案名
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings')

app = Celery('pro')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#  should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
  print(f'Request: {self.request!r}')

在 與專案同名的目錄下 的__init__.py檔案中新增內容

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

在settings.py檔案中新增配置

  • CELERY_BROKER_URL:中間人url,可以配置redis或者RabbitMQ
  • CELERY_RESULT_BACKEND:返回結果的儲存地址
  • CELERY_ACCEPT_CONTENT:接收內容的格式,分為兩種:json和msgpack。msgpack比json格式的資料體積更小,傳輸速度更快。
  • CELERY_TASK_SERIALIZER:任務載荷的序列化方式-->json
  • CELERY_TIMEZONE
  • CELERY_TASK_TRACK_STARTED:是否開啟任務跟蹤
  • CELERY_TASK_TIME_LIMIT:任務超時限制
# Celery配置
CELERY_BROKER_URL = env("CELERY_BROKER_URL")
CELERY_RESULT_BACKEND = env("CELERY_RESULT_BACKEND")
CELERY_ACCEPT_CONTENT = ["json","msgpack"]
CELERY_TASK_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

在app下建立tasks.py檔案,建立傳送訊息功能,任務方法必須新增裝飾器:@shared_task

from rest_framework.response import Response
from rest_framework.generics import GenericAPIView
from time import sleep
from celery import shared_task

class TestView3(GenericAPIView):

  @classmethod
  @shared_task
  def sleep(self,duration):
    sleep(duration)
    return Response("成功",status=200)

建立檢視和路由

### views.py
from .tasks import TestView3
class TestView1(GenericAPIView):
  def get(self,request):
    TestView3.sleep(10)
    return Response("celery實驗成功")
test_view_1 = TestView1.as_view()

### urls.py
from django.urls import path
from .views import (
  test_view_1
)

urlpatterns = [
  path('celery/',test_view_1,name="test1")
]

安裝redis並啟動

啟動django專案

使用Celery命令啟動Celery服務,命令:celery -A 專案名 worker -l info,如果如下所示則為啟動成功.

[email protected] v5.0.3 (singularity)

Darwin-20.1.0-x86_64-i386-64bit 2020-12-05 20:52:17

[config]
.> app:     drf_email_project:0x7f84a0c4ad68
.> transport:  redis://127.0.0.1:6379/1%20
.> results:   redis://127.0.0.1:6379/2
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery      exchange=celery(direct) key=celery


[tasks]
 . drf_email_project.celery.debug_task
 . users.tasks.sleep

[2020-12-05 20:52:18,166: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1%20
[2020-12-05 20:52:18,179: INFO/MainProcess] mingle: searching for neighbors
[2020-12-05 20:52:19,212: INFO/MainProcess] mingle: all alone
[2020-12-05 20:52:19,248: WARNING/MainProcess] /Users/apple/drf-email/lib/python3.7/site-packages/celery/fixups/django.py:204: UserWarning: Using settings.DEBUG leads to a memory
      leak,never use this setting in production environments!
 leak,never use this setting in production environments!''')

[2020-12-05 20:52:19,249: INFO/MainProces

到此這篇關於Django中使用Celery的方法步驟的文章就介紹到這了,更多相關Django使用Celery的方法步驟內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!