1. 程式人生 > 程式設計 >使用celery和Django處理非同步任務的流程分析

使用celery和Django處理非同步任務的流程分析

介紹

我們可能需要一些可以安排一些任務並定期執行一些任務或非同步處理長任務的東西,而這一切都可以通過在Django Project中使用Celery來實現。

什麼是Celery?

Celery是 一個專注於實時處理的任務佇列,它還支援任務排程。 Celery快速,簡單,高度可用且靈活。

Celery需要訊息傳輸來發送和接收訊息,這可以由Redis或RabbitMQ完成。

入門

讓我們開始在您的virtualenv中安裝Celery軟體包。

安裝Celery

<span class="nv">$ </span>pip <span class="nb">install </span>celery
pip install celery

安裝Redis

我們將Message Broker用作Redis,所以我們安裝

Linux / Mac使用者

您可以從這裡下載最新版本

$ wget http://download.redis.io/releases/redis-4.0.8.tar.gz
$ tar xzf redis-4.0.8.tar.gz
$ cd redis-4.0.8
$make

Windows使用者

對於Windows使用者,您可以從此處獲取redis的可執行檔案。

安裝後,請嘗試是否正確安裝。

$ redis-cli ping

它應該顯示:

pong

同時安裝redis的python包

$ pip install redis

Django的第一步

現在您已經成功安裝了軟體包,現在就開始學習Django Project

settings.py Add some of the setting configuration in your settings.py CELERY_BROKER_URL = 'redis://localhost:6379' CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = "YOUR_TIMEZONE"

確保您已從YOUR_TIMEZONE更改時區。 您可以從這裡獲取時區

主Django專案目錄中建立celery.py檔案

- src/ - manage.py - celery_project/ - __init__.py - settings.py - urls.py - celery.py celery_project/celery.py

在celery.py模組中新增以下程式碼。 該模組用於定義celery例項。

確保已使用django專案名稱更改了專案名稱(<your project name>)

from __future__ import absolute_import,unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE','<your project name>.settings')
app = Celery('<your project name>')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings',namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
 print('Request: {0!r}'.format(self.request))
celery_project/__init__.py

然後,我們需要將定義celery.py的應用程式匯入到主專案目錄的__init__.py。 這樣,我們可以確保在Django專案啟動時已載入應用

from __future__ import absolute_import,unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app'] 

建立任務

現在建立一些任務

在您在INSTALLED_APPS中註冊的任何應用程式中建立一個新檔案

my_app/tasks.py
from __future__ import absolute_import,unicode_literals
from celery import shared_task
@shared_task(name = "print_msg_with_name")
def print_message(name,*args,**kwargs):
 print("Celery is working!! {} have implemented it correctly.".format(name))
@shared_task(name = "add_2_numbers")
def add(x,y):
 print("Add function has been called!! with params {},{}".format(x,y))
 return x+y

開始程式

開啟一個NEW終端並執行以下命令以執行celery的worker例項,並將目錄更改為您的主專案目錄所在的位置,即,將manage.py檔案放置的目錄,並確保您已經 啟用您的virtualenv(如果已建立)。

用您的專案名稱更改專案名稱

$ celery -A <your project name> worker -l info

輸出:

-------------- celery@root v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-4.13.0-32-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-17 08:09:37 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celery_project:0x7f9039886400 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost:6379/ - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . add_2_numbers . celery_project.celery.debug_task . print_msg_with_name [2018-02-17 08:09:37,877: INFO/MainProcess] Connected to redis://localhost:6379// [2018-02-17 08:09:37,987: INFO/MainProcess] mingle: searching for neighbors [2018-02-17 08:09:39,084: INFO/MainProcess] mingle: all alone [2018-02-17 08:09:39,121: WARNING/MainProcess] /home/jai/Desktop/demo/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak,never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak,never ' [2018-02-17 08:09:39,121: INFO/MainProcess] celery@root ready.

注意:檢查上面的[tasks],它應該包含您在task.py模組中建立的任務的名稱。

有關更多資訊和日誌,您還可以在DEBUG MODE中執行worker例項

celery <span class="nt">-A</span> <your project name> worker <span class="nt">-l</span> info <span class="nt">--loglevel</span><span class="o">=</span>DEBUG celery -A <your project name> worker -l info --loglevel=DEBUG

注意:請勿關閉此終端,應保持開啟狀態!!

測試任務

現在讓我們從django shell執行任務開啟Django shell

$ python3 manage.py shell

用delay方法執行函式:

>>> from my_app.tasks import print_message,add
>>> print_message.delay("Jai Singhal")
<AsyncResult: fe4f9787-9ee4-46da-856c-453d36556760>
>>> add.delay(10,20)
<AsyncResult: ca5d2c50-87bc-4e87-92ad-99d6d9704c30>

當檢查您的celery worker例項正在執行的第二個終端時,您將獲得此型別的輸出,顯示您的任務已收到且任務已成功完成

[2018-02-17 08:12:14,375: INFO/MainProcess] Received task: my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] [2018-02-17 08:12:14,377: WARNING/ForkPoolWorker-4] Celery is working!! Jai Singhal have implemented it correctly. [2018-02-17 08:12:14,382: INFO/ForkPoolWorker-4] Task my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] succeeded in 0.004476275000342866s: None [2018-02-17 08:12:28,344: INFO/MainProcess] Received task: my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] [2018-02-17 08:12:28,349: WARNING/ForkPoolWorker-3] Add function has been called!! with params 10,20 [2018-02-17 08:12:28,358: INFO/ForkPoolWorker-3] Task my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] succeeded in 0.010077004999857309s: 30

總結

以上所述是小編給大家介紹的使用celery和Django處理非同步任務的流程分析,希望對大家有所幫助,也非常感謝大家對我們網站的支援!