1. 程式人生 > >探究flask中的celery後臺任務

探究flask中的celery後臺任務

目錄

專案的開發經常用到celery後臺任務,下面記錄下我在專案中用到celery的地方以及自己的一些思考。

使用場景

  1. 在web開發時有一些耗時的操作需要執行,但是你又不想一直阻塞前端,那麼可以嘗試使用celery的後臺任務,將請求傳送到celery後臺,然後前端不再阻塞,最後celery後臺將任務完成之後將結果返回。使用者就可以知道任務是否執行成功。
  2. 做資料庫非同步更新,可能要在某個時間點與遠端資料庫進行同步,更新的時間也比較久,不希望阻塞前端的程序。

程式碼編寫

首先來看一下一個簡單的celery後臺任務。專案裡的部署任務是一個耗時的函式,使用者在前端點選了一個部署按鈕之後可以執行其他操作,最後會將部署的結果返回給使用者,更新任務狀態。這裡的修飾器是傳入了celery任務的上下文。

@celery_ext.celery.task(base=RequestContextTask, ignore_result=True)
def deploy(**kwargs):
    """執行部署任務.

    :param task_id: 任務 id
    :param update_task_state_url: 更新任務狀態通知的 URL
    """
    # 更新任務狀態
    # 執行作業系統安裝

在全域性flask框架入口init,一般在這個檔案裡我們建立全域性的flask物件,然後進行外掛初始化,並且註冊檢視和藍圖。我們初始化了celery_ext,進行外掛的初始化

celery_ext = FlaskCeleryExt()
...
def create_app(config)
    ...
    celery_ext.init_app(app)
    ...

再來看下FlaskCeleryExt這個類,主要做的是物件一些初始化工作

class FlaskCeleryExt(object):
    """Flask-Celery extension."""

    def __init__(self, app=None, create_celery_app=None):
        """Initialize the Flask-CeleryExt."""
self.create_celery_app = create_celery_app or pkg_create_celery_app self.celery = None if app is not None: self.init_app(app) def init_app(self, app): """Initialize a Flask application.""" # Follow the Flask guidelines on usage of app.extensions if not hasattr(app, 'extensions'): app.extensions = {} # pragma: no cover app.extensions['flask-celeryext'] = self self.celery = self.create_celery_app(app)

當然啦,還有一些配置的工作。我們一般將全域性的配置檔案寫在config.py裡面。這裡設定了celery broker url 以及一些後臺任務的屬性配置。也可以在設定BACKEND的URL。我們在setup app的時候會用到這些配置的引數。

class Config(object):
    """框架配置
    配置 Flask 所必需的引數,其它系統引數應儲存到資料庫
    """
    # celery
    CELERY_BROKER_URL = 'amqp://guest:[email protected]:5672//'
    CELERY_TASK_ALWAYS_EAGER = False
    CELERY_TASK_EAGER_PROPAGATES_EXCEPTIONS = True

manage.py中新增啟動命令列

@manager.option('-l', '--loglevel', dest='loglevel', default='info')
@manager.option('-c', '--concurrency', dest='concurrency', type=int, default=2)
def worker(loglevel='info', concurrency=2):
    """啟動後臺的 celery worker"""
    from celery.apps.worker import Worker

    worker_entry = Worker(app=celery, loglevel=loglevel, concurrency=concurrency)
    worker_entry.start()

最後在執行的時候,命令列需要啟動rabbitmq以及celery worker。

# 執行rabbitmq
    sudo service rabbitmq start

# 執行celery worker
    celery worker -A manage.celery --loglevel=info

底層實現

  1. python修飾器
    修飾器本身其實是一個python函式,他可以讓其他函式在不需要做任何程式碼改動的前提下增加額外的功能。引數可以是一個函式,返回值也可以是一個函式,好處在於不需要修改原來函式的程式碼並且為他新增新的功能,也能使一些常用的函式程式碼模組得到重用,減少冗餘度。另外修飾器可以進行疊加。celery用到修飾器的地方是設定celery後臺任務的上下文。

  2. celery實現
    celery是python多工庫來編寫的任務佇列工具,可以並行地執行一些任務。
    python與celery之間使用了訊息佇列作為中間人來進行任務佇列的管理。
    中介軟體的選擇可以是rabbitmq也可以是redis,看開發的需求吧。我們專案用的是rabbitmq。
    大致的一個邏輯圖是這樣子的:
    這裡寫圖片描述
    worker是執行單元,其實也就是多個執行緒。
    broker是訊息中介軟體,用於接收並暫存訊息佇列。
    backend是儲存每次任務的結果。

相關推薦

探究flaskcelery後臺任務

目錄 目錄 使用場景 程式碼編寫 底層實現 專案的開發經常用到celery後臺任務,下面記錄下我在專案中用到celery的地方以及自己的一些思考。 使用場景 在web開發時有一些耗時的操作需要執行,但是你又不想一直阻塞前端,那麼可以

flaskcelery介紹及使用celery實現非同步任務

Celery簡介 除Celery是一個非同步任務的排程工具。 Celery 是 Distributed Task Queue,分散式任務佇列,分散式決定了可以有多個 worker 的存在,隊列表示其是非同步操作,即存在一個產生任務提出需求的工頭,和一群等著

藍鯨平臺Celery後臺任務使用配置正確姿勢

目錄結構: 首先,celery_tasks要放到根目錄下否則會報: [2018-11-04 16:56:17,616: ERROR/MainProcess] Received unregistered

藍鯨平臺Celery後臺任務進度獲取程式碼

celery_tasks.py # -*- coding: utf-8 -*- """ Tencent is pleased to support the open source community by making 藍鯨智雲(BlueKing) availa

flask 訪問時後臺錯誤 error: [Errno 32] Broken pipe

默認 ken 解決辦法 分享圖片 error RR nbsp alt pipe 解決辦法:app.run(threaded=True) 個人理解:flask默認單線程,訪問一個頁面時會訪問到很多頁面,比如一些圖片,加入參數使其為多線程flask 中訪問時後臺錯誤 er

python之celeryflask使用

.org python3 單獨 per ext 由於 不支持 需要 dev 現在繼續學習在集成的框架中如何使用celery. 在Flask中使用celery 在Flask中集成celery需要做到兩點: 創建celery的實例對象的名字必須是flask應用程序app的名字

Flask 使用 Celery

在 Flask 中使用 Celery 原文地址:http://www.pythondoc.com/flask-celery/first.html 後臺執行任務的話題是有些複雜,因為圍繞這個話題會讓人產生困惑。為了簡單起見,在以前我所有的例子中,我都是線上程中執行後臺任務,但是我

python celery捕捉unicode字元型別的錯誤,導致後臺任務失敗的解決方案

背景 公司有一個用django(1.8.0)寫的運維平臺,目的用於申請阿里雲和騰訊雲機器;申請雲上機器採用後臺非同步的方式,框架採用redis+celery(3.1.18),但最近發現一個問題,就是有時候申請騰訊雲機器的後臺任務因為沒有捕捉到某些異常,導致任務會

Gunicorn+Flask重啟啟動後臺執行緒問題

假設程式如下: 1 if __name__ == '__main__': 2 t = Thread(target=test) 3 t.start() 4 app.run(host='0.0.0.0',port=8080,debug=False)     gunicor

談談.NET Core基於Generic Host來實現後臺任務

目錄 前言 很多時候,後臺任務對我們來說是一個利器,幫我們在後面處理了成千上萬的事情。 在.NET Framework時代,我們可能比較多的就是一個專案,會有一到多個對應的Windows服務,這些Windows服務就可以當作是我們所說的後臺任務了。 我喜歡將後臺任務分為兩大類,一類是不停的跑,好比MQ的消費

Celery分散式任務佇列框架--基於flask實現

使用Celery的方法 Celery是分散式的任務佇列 特點: 簡單、靈活、高可用 1) 安裝Celery pip install celery 2) 安裝 redis redis可以使用list結構,提供訊息佇列的功能 3)建立Celery物件

ASP.NET Core MVC應用程式後臺工作計劃任務

  在應用程式的記憶體中快取常見資料(如查詢)可以顯著提高您的MVC Web應用程式效能和響應時間。當然,這些資料必須定期重新整理。   當然你可以使用任何方法來更新資料,例如Redis中就提供了設定快取物件的生命時間,那麼對於這種單物件的更新的做法我覺得是不符合我的程式設計習慣的,我們可以使用Quar

解決Xshelljobs命令不顯示後臺任務

今天遇到一個關於xshell連線伺服器jobs命令的坑。這個jobs命令只能顯示當前控制檯創造的任務。反正在我這裡,無法顯示別人的後臺任務,輸入jobs沒有任何反應。 [[email protected] ~]# jobs [[email protect

win7使用django-celery定時任務配置

1. 使用RabbitMQ 代理(broker)在客戶端和工作執行者之間進行互動http://www.rabbitmq.com/install-windows.html安裝RabbitMQ 必須先下載安裝了erlang 才可以安裝RabbitMQ 2.使用pip安裝:pip

【5min+】後臺任務的積木。.NetCore的IHostedService

系列介紹 【五分鐘的dotnet】是一個利用您的碎片化時間來學習和豐富.net知識的博文系列。它所包含了.net體系中可能會涉及到的方方面面,比如C#的小細節,AspnetCore,微服務中的.net知識等等。 5min+不是超過5分鐘的意思,"+"是知識的增加。so,它是讓您花費5分鐘以

MVC根據後臺絕對路徑讀取圖片並顯示在IMG

copy ret execute ble 操作文件 初學者 實體類 cin jpeg 數據庫存取圖片並在MVC3中顯示在View中 根據路徑讀取圖片: 1 byte[] img = System.IO.File.ReadAllBytes(@"d:\x

java定時執行任務

views sch start bsp tails pri ted java file 現在項目中用到需要定時去檢查文件是否更新的功能。timer正好用於此處。 用法很簡單,new一個timer,然後寫一個timertask的子類即可。 代碼如下: [java] vie

linux後臺運行和關閉、查看後臺任務

ctrl+z ctrl+c .com nat 執行 opp 後臺進程 查看 用戶 fg、bg、jobs、&、nohup、ctrl+z、ctrl+c 命令 一、& 加在一個命令的最後,可以把這個命令放到後臺執行,如 watch -n 10 sh

flask返回requests響應

cnblogs led form status redirect 壓縮 rom out 自動跳轉   在flask服務端,有時候需要使用requests請求其他url,並將響應返回回去。查閱了flask文檔,About Responses,可以直接構造響應結果進行返回。  

Spring Boot 配置定時任務,實現多線程操作

pre log pri http code china 部分 多線程操作 .net 參考的代碼部分 https://git.oschina.net/jokerForTao/spring_boot_schedule 一目了然!Spring Boot 中配置定時任務,實現