1. 程式人生 > 實用技巧 >Django-Celery-Redis非同步發郵件

Django-Celery-Redis非同步發郵件

Django-Celery-Redis非同步發郵件

python == 3.7.6

django == 2.1.8

celery == 4.4.7

redis == 3.5.3

eventlet == 0.26.1

什麼是celery

Celery是一個功能完備即插即用的任務佇列。它使得我們不需要考慮複雜的問題,使用非常簡單。celery適用非同步處理問題,當傳送郵件、或者檔案上傳, 影象處理等等一些比較耗時的操作,我們可將其非同步執行,這樣使用者不需要等待很久,提高使用者體驗。 celery的特點是:

  • 簡單,易於使用和維護,有豐富的文件。
  • 高效,單個celery程序每分鐘可以處理數百萬個任務。
  • 靈活,celery中幾乎每個部分都可以自定義擴充套件。

celery核心

1、Task

任務(Task)就是你要做的事情,例如一個註冊流程裡面有很多工,給使用者發驗證郵件就是一個任務,這種耗時任務可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的註冊人數,這個也可以交給Celery週期性的處理。

2、Broker

Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中它介於生產者和消費者之間經紀人,這個角色相當於資料結構中的佇列。例如一個Web系統中,生產者是處理核心業務的Web程式,業務中可能會產生一些耗時的任務,比如簡訊,生產者會將任務傳送給 Broker,就是把這個任務暫時放到佇列中,等待消費者來處理。消費者是 Worker,是專門用於執行任務的後臺服務。Worker 將實時監控佇列中是否有新的任務,如果有就拿出來進行處理。Celery 本身不提供佇列服務,一般用 Redis 或者 RabbitMQ 來扮演 Broker 的角色

3、Worker

Worker 就是那個一直在後臺執行任務的人,也稱為任務的消費者,它會實時地監控佇列中有沒有任務,如果有就立即取出來執行。

4、Beat

Beat 是一個定時任務排程器,它會根據配置定時將任務傳送給 Broker,等待 Worker 來消費。

5、Backend

Backend 用於儲存任務的執行結果,每個任務都有返回值,比如傳送郵件的服務會告訴我們有沒有傳送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。

Broker選擇

Celery需要一種解決訊息的傳送和接受的方式,我們把這種用來儲存訊息的的中間裝置叫做message broker, 也可叫做訊息中間人。 作為中間人,我們有幾種方案可選擇:

RabbitMQ

RabbitMQ是一個功能完備,穩定的並且易於安裝的broker. 它是生產環境中最優的選擇。

使用RabbitMQ的細節參照以下連結: http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

Redis

Redis也是一款功能完備的broker可選項,但是其更可能因意外中斷或者電源故障導致資料丟失的情況。

Redis作為Broker,可訪下面網址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

將redis釋出訂閱模式用做訊息佇列和rabbitmq的區別:

可靠性

redis:沒有相應的機制保證訊息的可靠消費,如果釋出者釋出一條訊息,而沒有對應的訂閱者的話,這條訊息將丟失, 不會存在記憶體中;rabbbitmq: 具有訊息消費確認機制,如果釋出一條訊息,還沒有消費者消費該佇列,那麼這條訊息將一直存放在佇列中,直到有消費者消費了該條訊息,以此可以保證訊息的可靠消費。

實時性

redis實時性高,redis作為高效的快取伺服器,所有資料都存在在伺服器中,所以它具有更高的實時性消費者負載均衡;rabbitmq佇列可以被多個消費者同時監控消費,但是每一條訊息只能被消費一次,由於rabbitmq的消費確認機制,因此它能夠根據消費者的消費能力而調整它的負載,redis釋出訂閱模式,一個佇列 可以被多個消費者同時訂閱,當有訊息到達時,會將該訊息依次傳送給每個訂閱者;

永續性

redis:redis的持久化是針對 於整個redis快取的內容,它有RDB和AOF兩個持久化方式,可以將整個redis例項持久化到磁碟,以此來做資料備份,防止異常情況下導致資料丟失。rabbitmq:佇列、訊息都可以選擇性持久化,持久化粒度更小,更靈活;佇列監控rabbitmq實現了後臺監控平臺,可以在該平臺上看到所有建立的佇列的詳細情況,良好的臺後管理平臺可以方便我們更好的使用;redis沒有所謂的監控平臺。

總結:

redis:輕量級、低延遲,高併發、低可靠性;

rabbitmq:重量級、高可靠,非同步,不保證實時

在Django中使用celery

django專案名稱:celery_test

在django專案celery_test/celery_test/下建立celery.py檔案,配置以下內容:
# celery.py檔案
import os
from celery import Celery

# 把celery和django進行組合,識別和載入django的配置檔案
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_test.settings')

# 建立celery例項
app = Celery('celery_test')

# 指定celery訊息佇列的配置
app.config_from_object('celery_test.config', namespace='CELERY')

# 從所有的django-app中載入任務
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))
在django專案celery_test/celery_test/下建立config.py檔案,配置以下內容:
# 訊息中間人設定
broker_url = 'redis://127.0.0.1:6379/15'
# 結果儲存設定
result_backend = 'redis://127.0.0.1:6379/14'

在django專案celery_test/celery_test/下__init__.py中寫入以下內容:
# 絕對引用,使我們的celery模組不會與原始的celery衝突
from __future__ import absolute_import, unicode_literals
# 加入絕對引入以後,匯入當前模組下的內容方法: from xx import xx as xx
from .celery import app as celery_app

__all__ = ('celery_app',)
在settings.py中配置發郵件的內容:
# 配置郵件傳送
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'  # 如果為163郵箱,設定為smtp.163.com
EMAIL_PORT = 25  # 或者 465/587是設定了 SSL 加密方式
# 傳送郵件的郵箱
EMAIL_HOST_USER = '3735***[email protected]'
# 在郵箱中設定的客戶端授權密碼
EMAIL_HOST_PASSWORD = 'xxxxxxxxxxxxx'  # 第三方登陸使用的授權密碼
EMAIL_USE_TLS = True  # 這裡必須是 True,否則傳送不成功
# 收件人看到的發件人, 必須是一直且有效的
EMAIL_FROM = '海上明月<3735***[email protected]>'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
在django的app中建立tasks.py檔案,在這裡寫入非同步任務內容:
# 絕對引用,使我們的celery模組不會與原始的celery衝突
from __future__ import absolute_import, unicode_literals
# 匯入原始的celery模組中shared_task    from xx import xx
from celery import shared_task  
# 使用django內建函式傳送郵件
from django.core.mail import send_mail
# 匯入django的settings
from django.conf import settings  


@shared_task
def send_mail_task(usernaem, email, token):
    """
    使用django內建函式傳送郵件
    """
    subject = "海上明月"
    message = ""
    sender = settings.EMAIL_FROM
    recipient = [email]
    html_message = "<h1>{},歡迎您註冊,請點選以下連結進行啟用郵箱:<a href='http://127.0.0.1:8000/api/v1/active/{}'>點選這裡進行啟用</a></h1>".format(username, token)
    send_mail(subject, message, sender, recipient, html_message=html_message)
在django的app下的views.py中呼叫tasts下的任務
from django.views import View
from django.http import JsonResponse
from .tasks import send_mail_task
from itsdangerous import TimedJSONWebSignatureSerializer
from django.conf import settings

# 傳送郵件
token_serializer = TimedJSONWebSignatureSerializer(settings.SECRET_KEY, 600)


class SendMailView(View):
    def post(self, request):
        username = 'Edward'
        email = '3706***[email protected]'
        user_info = {'user_id': '1'}
        token = token_serializer.dumps(user_info).decode()
        send_mail_task.delay(username, email, token)
        return JsonResponse({'msg': 'OK'})

郵件傳送成功以後,通過get方式獲取使用者的token

from django.views import View
from django.http import JsonResponse
from itsdangerous import TimedJSONWebSignatureSerializer, SignatureExpired
from django.conf import settings

# 傳送郵件
token_serializer = TimedJSONWebSignatureSerializer(settings.SECRET_KEY, 600)


class ActiveView(View):
    """
    啟用使用者
    """
    def get(self, request, token):       
        try:
            user_info = token_serializer.loads(token)            
            return JsonResponse({'msg': '啟用成功', 'code': 200})
        except SignatureExpired:
            return JsonResponse({'msg': '啟用連結失效,請重新發送郵件進行啟用', 'code': 404})
        

啟動celery

開啟pycharm下的Terminal,進入到專案根目錄下,使用eventlet啟動celery

celery -A celery_test worker -l info -P eventlet