1. 程式人生 > 其它 >python測試開發django-159.Celery 非同步與 RabbitMQ 環境搭建

python測試開發django-159.Celery 非同步與 RabbitMQ 環境搭建

前言

Celery是一個Python任務佇列系統,用於處理跨執行緒或網路節點的工作任務分配。它使非同步任務管理變得容易。
您的應用程式只需要將訊息推送到像RabbitMQ這樣的代理,Celery worker會彈出它們並安排任務執行。

Celery

celery 的5個角色

  • Task 就是任務,有非同步任務(Async Task)和定時任務(Celery Beat)
  • Broker 中間人,接收生產者發來的訊息即Task,將任務存入佇列。任務的消費者是Worker。Celery 本身不提供佇列服務,推薦用Redis或RabbitMQ實現佇列服務。
  • Worker 執行任務的單元,它實時監控訊息佇列,如果有任務就獲取任務並執行它。
  • Beat 定時任務排程器,根據配置定時將任務傳送給Broker。
  • Backend 用於儲存任務的執行結果。

環境準備

1.django環境v2.1.2
2.安裝celery版本

pip install celery==3.1.26.post2

3.安裝django-celery包

pip install django-celery==3.3.1

RabbitMQ 環境

Broker(RabbitMQ) 負責建立任務佇列,根據一些路由規則將任務分派到任務佇列,然後將任務從任務佇列交付給 worker
先使用docker 搭建RabbitMQ 環境,rabbitMQ 映象倉庫地址 https://hub.docker.com/_/rabbitmq

找帶有 mangement的版本,會帶web後臺管理介面

下載 3.8.0-management 映象

docker pull rabbitmq:3.8.0-management

啟動容器,設定賬號 admin 和密碼 123456

docker run -d --name rabbitmq3.8 -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.0-management

宿主機需開放 5672 和 15672 這 2 個埠,5672 是後端介面訪問的埠,15672 是前端 web 管理後臺頁面地址,輸入http://ip:15672

可以訪問 web 網站

輸入前面設定的賬號 admin 和密碼 123456 可以直接登入

Django 中使用 Celery

要在 Django 專案中使用 Celery,您必須首先定義 Celery 庫的一個例項(稱為“應用程式”)

如果你有一個現代的 Django 專案佈局,比如:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那麼推薦的方法是建立一個新的proj/proj/celery.py模組來定義 Celery 例項:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

其中debug_task是測試的任務,可以注掉

# @app.task(bind=True)
# def debug_task(self):
#     print('Request: {0!r}'.format(self.request))

上面一段只需改這句,'proj'是自己django專案的app名稱

app = Celery('proj')

然後你需要在你的proj/proj/__init__.py 模組中匯入這個應用程式。這確保在 Django 啟動時載入應用程式,以便@shared_task裝飾器(稍後提到)將使用它:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

上面這段固定的,不用改

tasks任務

在app下新建tasks.py,必須要是tasks.py檔名稱,django會自動查詢到app下的該檔案


@shared_task
def add(x, y):
    print("task----------1111111111111111111111")
    return x + y


@shared_task
def mul(x, y):
    return x * y

tasks.py可以寫任務函式add、mul,讓它生效的最直接的方法就是新增app.task 或shared_task 這個裝飾器

新增setting配置

setting.py新增配置

  • broker引數表示用來連線broker的URL,rabbitmq採用的是一種稱為’amqp’的協議,如果rabbitmq執行在預設設定下,celery不需要其他資訊,只要amqp://即可。
  • backend引數是可選的,如果想要查詢任務狀態或者任務執行結果時必填, Celery中的後端用於儲存任務結果。rpc意味著將結果作為AMQP訊息傳送回去。
#   RabbitMQ配置BROKER_URL 和backend
BROKER_URL = 'amqp://admin:[email protected]:5672//'
CELERY_RESULT_BACKEND = 'rpc://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

建立檢視

views.py建立檢視

from .tasks import add, mul

def task_demo(request):
    res = add.delay(10, 20)
    print(res.task_id)  # 返回task_id
    return JsonResponse({"code": 0, "res": res.task_id})

啟動worker

前面pip已經安裝過celery應用了,celery是一個獨立的應用,可以啟動worker

celery -A MyDjango worker -l info

其中MyDjango是你自己的django專案名稱

執行日誌

 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         yoyo:0x1ea1a96e9b0
- ** ---------- .> transport:   amqp://admin:**@192.168.1.11:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . yoyo.tasks.add
  . yoyo.tasks.mul

[2021-10-18 22:45:03,155: INFO/MainProcess] Connected to amqp://admin:**@192.168.1.11:5672//
[2021-10-18 22:45:03,347: INFO/MainProcess] mingle: searching for neighbors
[2021-10-18 22:45:04,897: INFO/MainProcess] mingle: all alone
[2021-10-18 22:45:05,406: WARNING/MainProcess] e:\python36\lib\site-packages\celery\fixups\django.py:265: 
UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2021-10-18 22:45:05,407: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.

執行的時候,當我們看到" Connected to amqp"說明已經連線成功了!

shell互動環境

在django shell互動環境除錯執行任務

D:\202107django\MyDjango>python manage.py shell
Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from yoyo.tasks import add,mul
>>> from celery.result import AsyncResult
>>>
>>> res = add.delay(11, 12)
>>> res
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> res.status
'SUCCESS'
>>>
>>> res.backend
<celery.backends.redis.RedisBackend object at 0x0000015E011C3128>
>>>
>>> res.task_id
'c5ff83a4-4840-4b36-8869-5ce6081904f1'
>>>
>>>
>>> get_task = AsyncResult(id=res.task_id)
>>> get_task
<AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
>>> get_task.result
23
>>>

res.status是檢視任務狀態
res.task_id 是獲取任務的id
res.result 獲取任務結果
根據任務的id查詢任務的執行結果AsyncResult(id=res.task_id).result獲取