1. 程式人生 > 其它 >【luogu P6657】【模板】LGV 引理(行列式)(數學)(線性代數)

【luogu P6657】【模板】LGV 引理(行列式)(數學)(線性代數)

一、Celery架構介紹

Celery是分散式非同步任務框架,Celery的架構由三部分組成,訊息中介軟體(message broker)、任務執行單元(worker)和 任務執行結果儲存(task result store)組成。

訊息中介軟體

Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis等等

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中。

任務結果儲存

Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis等

"""
1)可以不依賴任何伺服器,通過自身命令,啟動服務(內部支援socket)
2)celery服務為為其他專案服務提供非同步解決任務需求的
注:會有兩個服務同時執行,一個是專案服務,一個是celery服務,專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求

人是一個獨立執行的服務 | 醫院也是一個獨立執行的服務
	正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
	人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求
"""

1、使用場景

非同步執行:解決耗時任務,將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推送、音視訊處理等等

延遲執行:解決延遲任務
定時執行:解決週期(週期)任務,比如每天資料統計

二、Celery簡單使用

pip install celery==5.1.2  # 安裝

1、使用

# 使用(舉例,假設放在s1.py檔案上)
from celery import Celery

# broker = 'redis://:密碼@127.0.0.1:6379/1'  # -有密碼的寫法
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址


# 例項化得到celery物件,第一個引數是起個名字的意思
app = Celery(__name__, backend=backend, broker=broker)


# 使用裝飾器包裹任務,將任務管理起來
@app.task()  # 寫一個任務函式(示例,實際情況下任務應該單獨建立檔案)
def add(x, y):
    return x + y

#############################################################

# 任務提交(在另外一個s2.py檔案提交任務)
import s1

# 非同步執行
# 第一步:提交(使用的任務名.apply_async(引數))
res = s1.add.apply_async(args=[2, 3])
print(res)  # 會返回一個任務id號,唯一標識

#############################################################

# 第二步:讓worker執行-->結果存在redis,使用命令啟動

# 【非Windows】
celery worker -A s1檔名 -l info

# 【Windows】需要安裝一個模組 pip3 install eventlet
celery worker -A 檔名 -l info -P eventlet  # 5.X前的啟動命令
celery -A 檔名 worker -l info -P eventlet  # 5.X的啟動命令

#############################################################

# 第三步:檢視任務結果(放在任意檔案檢視,如s3.py)
from s1 import app
from celery.result import AsyncResult

id = '1b814481-08aa-484f-a1f9-3952a762df9a'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任務失敗')
    elif async.status == 'PENDING':
        print('任務等待中被執行')
    elif async.status == 'RETRY':
        print('任務異常後正在重試')
    elif async.status == 'STARTED':
        print('任務已經開始被執行')

三、celery包解構

# 目錄結構
-celery_task               # 包名
	__init__.py
	celery.py             # app所在py檔案
	course_task.py        # 任務
	order_task.py         # 任務
	user_task.py          # 任務
提交任務.py                # 提交任務
檢視結果.py                # 檢視結果

1、執行非同步任務

"""首先是celery.py檔案:存放celery的使用"""

from celery import Celery

# broker = 'redis://:密碼@127.0.0.1:6379/1'  # -有密碼的寫法
broker = 'redis://127.0.0.1:6379/1'  # redis地址
backend = 'redis://127.0.0.1:6379/2'  # redis地址


"""
例項化得到celery物件,第一個引數是起個名字的意思,第二和第三個引數對應上面的redis地址,
第三個引數include是一個列表,放被管理的task的py檔案
"""
app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.user_task',
    'celery_task.order_task',
])

##################################################################################

"""然後就是user_task.py這些任務檔案,舉個例子,模擬一個傳送簡訊驗證碼的任務"""
from .celery import app

@app.task()  # 裝飾器管理任務
def send_sms(phone, code):
    import time
    time.sleep(3)  # 模擬傳送簡訊延遲
    print('簡訊傳送成功,手機號是:%s,驗證碼是:%s' % (phone, code))
    return '簡訊傳送成功'

##################################################################################

"""然後就是提交我們所寫的任務,放在提交任務.py檔案中,然後執行該檔案,再去啟動worker命令"""
from celery_task import user_task

# 提交一個簡訊任務
a = user_task.send_sms.apply_async(args=(137000, 4691))  # 使用位置傳參
# a = user_task.send_sms.apply_async(kwargs={'x': 137000, 'y': 4691})  也可以使用關鍵字傳參
# a = user_task.send_sms.delay('137000','4691')  也可以寫delay這種方式,可以直接傳參
print(a)  # 會以字串形式返回一個任務id號,可以用來檢視任務結果

###################################################################################

執行worker命令後,提交一次任務,worker就執行一次,如果沒有任務就會一直阻塞著,等待下一次任務執行。(執行worker記得把位置切到需要啟動的包的上一層資料夾裡)

# 【非Windows】
celery worker -A 檔名 -l info

# 【Windows】需要安裝一個模組 pip3 install eventlet
celery worker -A 檔名 -l info -P eventlet  # 5.X前的啟動命令
celery -A 檔名 worker -l info -P eventlet  # 5.X的啟動命令

# 檔名就是目錄結構的包名

###################################################################################

"""檢視結果.py檔案,用於檢視任務執行的結果,下面基本就是固定程式碼"""
from celery_task.celery import app  # 匯入celery檔案下的app物件
from celery.result import AsyncResult

id = '1b814481-08aa-484f-a1f9-3952a762df9a'  # 使用提交任務返回的字串,來檢視結果
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任務失敗')
    elif async.status == 'PENDING':
        print('任務等待中被執行')
    elif async.status == 'RETRY':
        print('任務異常後正在重試')
    elif async.status == 'STARTED':
        print('任務已經開始被執行')

2、執行延遲任務

拿上面user_task的簡訊任務為例,將簡訊任務延遲10秒之後執行,所以直接在 "提交任務.py"下寫

from celery_task import user_task  # 匯入celery_task包下的user_task簡訊任務檔案
from datetime import datetime, timedelta  # 匯入關於時間的模組

# utcnow當前utc時間
eta = datetime.utcnow() + timedelta(seconds=10)  # 這句程式碼的意思是 當前utc時間的10秒後

# 10s之後傳送簡訊
res = user_task.send_sms.apply_async(args=(137000, 4691),eta=eta)
print(res)

3、執行定時任務

"""對定時任務進行配置,放在celery.py裡"""

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab  # 這個模組有各種指定任務時間的命令
app.conf.beat_schedule = {
    'send_sms_every_3_seconds': {  # 給配置起名字
        'task': 'celery_task.user_task.send_sms',  # 指定執行的任務
        'schedule': timedelta(seconds=3),  # 執行任務內容,例如這個是每三秒傳送一次簡訊
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
        'args': (137000, 4691),  # 傳引數,如果任務沒有引數,就不用寫
    }
}

"""
這時候worker啟動後並沒有執行,是因為我們之前執行任務是手動提交任務,為了能自動提交任務,我們需要啟動beat
再開一個終端,不要把啟動worker的終端頂掉了,然後輸入命令啟動beat
"""
celery beat -A celery_task -l  # 5.X之前的命令
celery -A celery_task  beat -l info  # 5.X之後的命令

四、Django整合Celery

# 瞭解性質內容:django-celery,一個把django和celery整合起來的第三方模組,但是第三方寫的包的版本,需要跟celery和django版本完全對應。略顯麻煩,所以我們不用
    	
# 我們自己使用包結構整合到django中

# 第一步,把寫好的包,直接複製到專案根路徑

# 第二步,在檢視類/函式中
from celery_task.user_task import send_sms  # 匯入想要使用的功能

def test(request):
    mobile = request.GET.get('mobile')
    code = '9999'
    res = send_sms.delay(mobile, code)  # 同步傳送假設3分支鍾,非同步傳送,直接就返回id了,後期通過id查詢傳送是否成功
    print(res)
    return HttpResponse(res)

# 啟動worker,訪問http://127.0.0.1:8000/test/?mobile=1111,帶著引數,就訪問成功了,後臺的終端就會顯示 簡訊傳送成功,手機號是:1111,驗證碼是:9999

1、django整合celery實現定時任務

1-1、定時更新首頁輪播圖

我們設想這樣一個場景,一個網站的首頁可能會有 海報輪播圖,這個輪播圖通常很長時間都不變,如果使用者每次訪問首頁,網站就得去查詢一次資料庫,對資料庫的壓力非常大

這時候我們可以使用Redis快取,以後使用者訪問首頁,只需要第一次訪問時從資料庫中獲取資料放入Redis中,之後的訪問都是直接從Redis中獲取資料,減少了資料庫查詢的次數

class BannerView(ViewSetMixin, ListAPIView):
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]
    serializer_class = serializer.BannerSerializer

    def list(self, request, *args, **kwargs):
        # 先去快取中獲取,如果快取有,直接返回,如果沒有,去資料庫查詢,放到快取

        # 先去快取中獲取
        banner_list = cache.get('banner_list_cache')
        if not banner_list:  # 去資料庫中獲取
            # 沒有走快取
            print('查了資料庫')
            res = super().list(request, *args, **kwargs)
            banner_list = res.data  # res是Response物件
            # 放入到快取中
            cache.set('banner_list_cache', banner_list)
        return Response(data=banner_list)

1-2、解決雙寫一致性問題

雖然用Redis快取很方便,可以有效解決資料庫壓力,但這種方法也會伴隨著一定問題,比如說:雙寫一致性問題(redis快取和mysql資料不同步),或者快取穿透,快取擊穿,快取雪崩等問題,不過我們也有相應的解決辦法

# 雙寫一致性問題:是快取資料庫更新的策略,資料庫的資料更新了,但是Redis快取沒更新,確怎麼解決

# 快取更新策略
	-先刪除快取,在更新資料庫
	-先更新資料庫。再更新快取(可靠性高)
    
    -定時更新(對實時性要求不高)
    	-可以設定為某個時間段,自動更新快取

1-2-1、home_task.py

from home import serializer
from .celery import app
from apps.home import models
from django.conf import settings
from django.core.cache import cache

@app.task()
def update_banner():
    # 從mysql中取出輪播圖資料
    queryset = models.Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
               :settings.BANNER_COUNT]

    # 2 序列化
    ser = serializer.BannerSerializer(instance=queryset, many=True)
    # 3 獲取到字典,手動拼上前面的地址
    banner_list = ser.data
    for banner in banner_list:
        banner['image'] = settings.BACKEND_URL % str(banner['image'])

    # 4 放到快取中
    cache.set('banner_list_cache', banner_list)
    return True

1-2-2、celery.py

from celery import Celery

# 由於celery和django 是獨立的兩個服務,要想在celery服務中使用django,必須加這兩句
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

# 由於celery和django 是獨立的兩個服務,要想在celery服務中使用django,必須加這兩句
import os

# os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")

broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# include  是一個列表,放被管理的task 的py檔案
app = Celery(__name__, backend=backend, broker=broker, include=[
    'celery_task.gagaluansha',
    'celery_task.user_task',
])

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 任務的定時配置
from datetime import timedelta

app.conf.beat_schedule = {
    'update_banner_every_3_seconds': {  # 給配置起名字
        'task': 'celery_task.home_task.update_banner',  # 指定執行的任務
        'schedule': timedelta(seconds=3),  # 執行任務內容,例如這個是每三秒傳送一次簡訊
    }
}

1-3-3、啟動djagno,啟動beat,啟動worker

python manage.py runserver 
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet