1. 程式人生 > >在 Flask 中使用 Celery

在 Flask 中使用 Celery

在 Flask 中使用 Celery

原文地址:http://www.pythondoc.com/flask-celery/first.html

這裡寫圖片描述

後臺執行任務的話題是有些複雜,因為圍繞這個話題會讓人產生困惑。為了簡單起見,在以前我所有的例子中,我都是線上程中執行後臺任務,但是我一直注意到更具有擴充套件性以及具備生產解決方案的任務佇列像 Celery 應該可以替代執行緒中執行後臺任務。

不斷有讀者問我關於 Celery 問題,以及怎樣在 Flask 應用中使用它,因此今天我將會向你們展示兩個例子,我希望能夠覆蓋大部分的應用需求。

什麼是 Celery?

Celery 是一個非同步任務佇列。你可以使用它在你的應用上下文之外執行任務。總的想法就是你的應用程式可能需要執行任何消耗資源的任務都可以交給任務佇列,讓你的應用程式自由和快速地響應客戶端請求。

使用 Celery 執行後臺任務並不像線上程中這樣做那麼簡單。但是好處多多,Celery 具有分散式架構,使你的應用易於擴充套件。一個 Celery 安裝有三個核心元件:

  1. Celery 客戶端: 用於釋出後臺作業。當與 Flask 一起工作的時候,客戶端與 Flask 應用一起執行。
  2. Celery workers: 這些是執行後臺作業的程序。Celery 支援本地和遠端的 workers,因此你就可以在 Flask 伺服器上啟動一個單獨的 worker,隨後隨著你的應用需求的增加而新增更多的 workers。
  3. 訊息代理: 客戶端通過訊息佇列和 workers 進行通訊,Celery 支援多種方式來實現這些佇列。最常用的代理就是
    RabbitMQ
    Redis

致行動派讀者

如果你是行動派,本文開頭的截圖勾起你的好奇心的話,那麼可以直接到 Github repository 獲取本文用到的程式碼。 README 檔案將會給你快速和直接的方式去執行示例應用。

接著可以回到本文來了解工作機制!

Flask 和 Celery 一起工作

Flask 與 Celery 整合是十分簡單,不需要任何外掛。一個 Flask 應用需要使用 Celery 的話只需要初始化 Celery 客戶端像這樣:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'
] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celery.conf.update(app.config)

正如你所見,Celery 通過建立一個 Celery 類物件來初始化,傳入應用名稱以及訊息代理的連線 URL,這個 URL 我把它放在 app.config 中的 CELERY_BROKER_URL 的鍵值。URL 告訴 Celery 代理服務在哪裡執行。如果你執行的不是 Redis,或者代理服務執行在一個不同的機器上,相應地你需要改變 URL。

Celery 其它任何配置可以直接用 celery.conf.update() 通過 Flask 的配置直接傳遞。CELERY_RESULT_BACKEND 選項只有在你必須要 Celery 任務的儲存狀態和執行結果的時候才是必須的。展示的第一個示例是不需要這個功能的,但是第二個示例是需要的,因此最好從一開始就配置好。

任何你需要作為後臺任務的函式需要用 celery.task 裝飾器裝飾。例如:

@celery.task
def my_background_task(arg1, arg2):
    # some long running task here
    return result

接著 Flask 應用能夠請求這個後臺任務的執行,像這樣:

task = my_background_task.delay(10, 20)

delay() 方法是強大的 apply_async() 呼叫的快捷方式。這樣相當於使用 apply_async():

task = my_background_task.apply_async(args=[10, 20])

當使用 apply_async(),你可以給 Celery 後臺任務如何執行的更詳細的說明。一個有用的選項就是要求任務在未來的某一時刻執行。例如,這個呼叫將安排任務執行在大約一分鐘後:

task = my_background_task.apply_async(args=[10, 20], countdown=60)

delay() 和 apply_async() 的返回值是一個表示任務的物件,這個物件可以用於獲取任務狀態。我將會在本文的後面展示如何獲取任務狀態等資訊,但現在讓我們保持簡單些,不用擔心任務的執行結果。

更多可用的選項請參閱 Celery 文件

簡單例子:非同步傳送郵件

我要舉的第一個示例是應用程式非常普通的需求:能夠傳送郵件但是不阻塞主應用。

在這個例子中我會用到 Flask-Mail 擴充套件,我會假設你們熟悉這個擴充套件。

我用來說明的示例應用是一個只有一個輸入文字框的簡單表單。要求使用者在此文字框中輸入一個電子郵件地址,並在提交,伺服器會發送一個測試電子郵件到這個郵件地址。表單中包含兩個提交按鈕,一個立即傳送郵件,一個是一分鐘後傳送郵件。表單的截圖在文章開始。

這裡就是支援這個示例的 HTML 模板:

<html>
  <head>
    <title>Flask + Celery Examples</title>
  </head>
  <body>
    <h1>Flask + Celery Examples</h1>
    <h2>Example 1: Send Asynchronous Email</h2>
    {% for message in get_flashed_messages() %}
    <p style="color: red;">{{ message }}</p>
    {% endfor %}
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
  </body>`這裡寫程式碼片`
</html>

這裡沒有什麼特別的東西。只是一個普通的 HTML 表單,再加上 Flask 閃現訊息。

Flask-Mail 擴充套件需要一些配置,尤其是電子郵件伺服器傳送郵件的時候會用到一些細節。為了簡單我使用我的 Gmail 賬號作為郵件伺服器:

# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = '[email protected]'

注意為了避免我的賬號丟失的風險,我將其設定在系統的環境變數,這是我從應用中匯入的。

有一個單一的路由來支援這個示例:

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    msg = Message('Hello from Flask',
                  recipients=[request.form['email']])
    msg.body = 'This is a test email sent from a background Celery task.'
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(msg)
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[msg], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))

再次說明,這是一個很標準的 Flask 應用。由於這是一個非常簡單的表單,我決定在沒有擴充套件的幫助下處理它,因此我用 request.method 和 request.form 來完成所有的管理。我儲存使用者在文字框中輸入的值在 session 中,這樣在頁面重新載入後就能記住它。

在這個函式中讓人有興趣的是傳送郵件的時候是通過呼叫一個叫做 send_async_email 的 Celery 任務,該任務呼叫 delay() 或者 apply_async() 方法。

這個應用的最後一部分就是能夠完成作業的非同步任務:

@celery.task
def send_async_email(msg):
    """Background task to send an email with Flask-Mail."""
    with app.app_context():
        mail.send(msg)

這個任務使用 celery.task 裝飾使得成為一個後臺作業。這個函式唯一值得注意的就是 Flask-Mail 需要在應用的上下文中執行,因此需要在呼叫 send() 之前建立一個應用上下文。

重點注意在這個示例中從非同步呼叫返回值並不保留,因此應用不能知道呼叫成功或者失敗。當你執行這個示例的時候,需要檢查 Celery worker 的輸出來排查傳送郵件的問題。

複雜例子:顯示狀態更新和結果

上面的示例過於簡單,後臺作業啟動然後應用忘記它。大部分 Celery 針對網頁開發的教程就到此為止,但是事實上許多應用程式有必要監控它的後臺任務並且獲取執行結果。

我現在將要做的就是擴充套件上面的應用程式成為第二個示例,這個示例展示一個虛構的長時間執行的任務。使用者點選按鈕啟動一個或者更多的長時間執行的任務,在瀏覽器上的頁面使用 ajax 輪詢伺服器更新所有任務的狀態。每一個任務,頁面都會顯示一個圖形的狀態列,進度條,一個狀態訊息,並且當任務完成的時候,也會顯示任務的執行結果。示例的截圖在本文的最開始。

狀態更新的後臺任務

讓我向你們展示我在第二個示例中使用的後臺任務:

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

對於這個任務,我在 Celery 裝飾器中添加了 bind=True 引數。這個引數告訴 Celery 傳送一個 self 引數到我的函式,我能夠使用它(self)來記錄狀態更新。

因為這個任務真沒有幹什麼有用的事情,我決定使用隨機的動詞,形容詞和名詞組合的幽默狀態資訊。你可以在程式碼上看到我用來生成上述資訊的毫無意義的列表。

self.update_state() 呼叫是 Celery 如何接受這些任務更新。有一些內建的狀態,比如 STARTED, SUCCESS 等等,但是 Celery 也支援自定義狀態。這裡我使用一個叫做 PROGRESS 的自定義狀態。連同狀態,還有一個附件的元資料,該元資料是 Python 字典形式,包含目前和總的迭代數以及隨機生成的狀態訊息。客戶端可以使用這些元素來顯示一個漂亮的進度條。每迭代一次休眠一秒,以模擬正在做一些工作。

當迴圈退出,一個 Python 字典作為函式結果返回。這個字典包含了更新迭代計數器,最後的狀態訊息和幽默的結果。

上面的 long_task() 函式在一個 Celery worker 程序中執行。下面你能看到啟動這個後臺作業的 Flask 應用路由:

@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async()
    return jsonify({}), 202, {'Location': url_for('taskstatus',
                                                  task_id=task.id)}

正如你所見,客戶端需要發起一個 POST 請求到 /longtask 來掀開這些任務中的一個的序幕。伺服器啟動任務,並且儲存返回值。對於響應我使用狀態碼 202,這個狀態碼通常是在 REST APIs 中使用用來表明一個請求正在進行中。我也添加了 Location 頭,值為一個客戶端用來獲取狀態資訊的 URL。這個 URL 指向另一個叫做 taskstatus 的 Flask 路由,並且有 task.id 作為動態的要素。

從 Flask 應用中訪問任務狀態

上面提及到 taskstatus 路由負責報告有後臺任務提供的狀態更新。這裡就是這個路由的實現:

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        // job did not start yet
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

這個路由生成一個 JSON 響應,該響應包含任務的狀態以及設定在 update_state() 呼叫中作為 meta 的引數的所有值,客戶端可以使用這些構建一個進度條。遺憾地是這個函式需要檢查一些條件,因此程式碼有些長。為了能夠訪問任務的資料,我重新建立了任務物件,該物件是 AsyncResult 類的例項,使用了 URL 中給的任務 id。

第一個 if 程式碼塊是當任務還沒有開始的時候(PENDING 狀態)。在這種情況下暫時沒有狀態資訊,因此我人為地製造了些資料。接下來的 elif 程式碼塊返回後臺的任務的狀態資訊。任務提供的資訊可以通過訪問 task.info 獲得。如果資料中包含鍵 result ,這就意味著這是最終的結果並且任務已經結束,因此我把這些資訊也加到響應中。最後的 else 程式碼塊是任務執行失敗的情況,這種情況下 task.info 中會包含異常的資訊。

不管你是否相信,伺服器所有要做的事情已經完成了。剩下的部分就是需要客戶端需要實現的,在這裡也就是用 JavaScript 指令碼的網頁來實現。

客戶端的 Javascript

這一部分就不是本文的重點,如果你有興趣的話,可以自己研究研究。

對於圖形進度條我使用 nanobar.js,我從 CDN 上引用它。同樣還需要引入 jQuery,它能夠簡化 ajax 的呼叫。

<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

啟動連線後臺作業的按鈕的 Javascript 處理程式如下:

function start_long_task() {
    // add task status elements
    div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div>&nbsp;</div></div><hr>');
    $('#progress').append(div);

    // create a progress bar
    var nanobar = new Nanobar({
        bg: '#44f',
        target: div[0].childNodes[0]
    });

    // send ajax POST request to start background job
    $.ajax({
        type: 'POST',
        url: '/longtask',
        success: function(data, status, request) {
            status_url = request.getResponseHeader('Location');
            update_progress(status_url, nanobar, div[0]);
        },
        error: function() {
            alert('Unexpected error');
        }
    });
}

div 的程式碼:

<div class="progress">
    <div></div>         <-- Progress bar
    <div>0%</div>       <-- Percentage
    <div>...</div>      <-- Status message
    <div>&nbsp;</div>   <-- Result
</div>
<hr>

最後 Javascript 的 update_progress 函式程式碼如下:

function update_progress(status_url, nanobar, status_div) {
      // send GET request to status URL
      $.getJSON(status_url, function(data) {
          // update UI
          percent = parseInt(data['current'] * 100 / data['total']);
          nanobar.go(percent);
          $(status_div.childNodes[1]).text(percent + '%');
          $(status_div.childNodes[2]).text(data['status']);
          if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
              if ('result' in data) {
                  // show result
                  $(status_div.childNodes[3]).text('Result: ' + data['result']);
              }
              else {
                  // something unexpected happened
                  $(status_div.childNodes[3]).text('Result: ' + data['state']);
              }
          }
          else {
              // rerun in 2 seconds
              setTimeout(function() {
                  update_progress(status_url, nanobar, status_div);
              }, 2000);
          }
      });
  }

這一部分的程式碼就不一一解釋了。

執行示例

首先下載程式碼,程式碼的位於 Github repository,接著執行以下的命令:

$ git clone https://github.com/miguelgrinberg/flask-celery-example.git
$ cd flask-celery-example
$ virtualenv venv
$ source venv/bin/activate
(venv) $ pip install -r requirements.txt

接著,啟動 redis,關於 redis 的安裝,啟動以及配置,請參閱 Redis 文件。

最後,執行如下命令執行示例:

$ export MAIL_USERNAME=<your-gmail-username>
$ export MAIL_PASSWORD=<your-gmail-password>
$ source venv/bin/activate
(venv) $ celery worker -A app.celery --loglevel=info

執行你的 Flask 應用來感受 Flask 和 Celery 一起工作的快樂:

$ source venv/bin/activate
(venv) $ python app.py