Flask 教程 第二十二章:後臺作業
這是Flask Mega-Tutorial系列的第二十二部分,我將告訴你如何建立獨立於Web伺服器之外執行的後臺作業。
本章致力於為應用程式中執行時間較長或複雜的非同步任務程序進行優化。這些程序不能在請求的上下文中同步執行,因為這會在任務持續期間阻塞對客戶端的響應。在第十章中,我將郵件的傳送轉移到後臺執行緒中執行,以免阻塞響應。 雖然使用執行緒處理電子郵件是可以接受的,但當問題處理時間更長時,此解決方案就不足以支撐了。 公認的做法是將耗時長的任務移交到worker程序(或程序池)。
為了證明長時間執行任務存在的必要性,我將介紹Microblog的一個匯出功能,使用者通過它可以請求一個包含他們所有使用者動態的資料檔案。 當用戶使用該選項時,應用程式將啟動一個匯出任務,該匯出任務將生成包含所有使用者動態的JSON檔案,然後通過電子郵件傳送給使用者。 所有這些活動都將在worker程序中發生,並且在執行時,使用者可以看到顯示完成百分比的進度。
本章的GitHub連結為:Browse, Zip, Diff.
任務佇列簡介
任務佇列為後臺作業提供了一個便捷的解決方案。 Worker程序獨立於應用程式執行,甚至可以位於不同的系統上。 應用程式和worker之間的通訊是通過訊息佇列完成的。 應用程式提交作業,然後通過與佇列互動來監視其進度。 下圖展示了一個典型的實現:
Python中最流行的任務佇列是Celery。 這是一個相當複雜的軟體包,它有很多選項並支援多個訊息佇列。 另一個流行的Python任務佇列是Redis Queue(RQ),它犧牲了一些靈活性,比如只支援Redis訊息佇列,但作為交換,它的建立要比Celery簡單得多。
Celery和RQ都非常適合在Flask應用程式中支援後臺任務,所以我傾向於選擇更簡單的RQ。 不過,用Celery實現相同的功能其實也不難。 如果你對Celery更感興趣,可以閱讀我的部落格中的Using Celery with Flask文章。
使用RQ
RQ是一個標準的Python三方軟體包,用pip
安裝:
(venv) $ pip install rq
(venv) $ pip freeze > requirements.txt
正如我前面提到的,應用和RQ worker之間的通訊將在Redis訊息佇列中執行,因此你需要執行Redis伺服器。 有許多途徑來安裝和執行Redis伺服器,比如下載其原始碼並執行編譯和安裝。 如果你使用的是Windows,Microsoft在brew install redis
,然後使用redis-server
命令手動啟動服務。
除了確保服務正在執行並可供RQ訪問之外,你不需要與Redis進行其他互動。
建立任務
通過RQ執行一項簡單的任務後,你就會很快熟悉它。 一個任務,不過是一個Python函式而已。 以下是一個示例任務,我將其放入一個新的app/tasks.py模組:
app/tasks.py:示例後臺任務。
import time
def example(seconds):
print('Starting task')
for i in range(seconds):
print(i)
time.sleep(1)
print('Task completed')
該任務將秒數作為引數,然後在該時間量內等待,並每秒列印一次計數器。
執行RQ Worker
任務準備就緒,可以通過rq worker
來啟動一個worker程序了:
(venv) $ rq worker microblog-tasks
18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1
18:55:06 Cleaning registries for queue: microblog-tasks
18:55:06
18:55:06 *** Listening on microblog-tasks...
Worker程序現在連線到了Redis,並在名為microblog-tasks
的佇列上檢視可能分配給它的任何作業。 如果你想啟動多個worker來擴充套件吞吐量,你只需要執行rq worker
來生成更多連線到同一個佇列的程序。 然後,當作業出現在佇列中時,任何可用的worker程序都可以獲取它。 在生產環境中,你可能希望至少執行可用CPU數量的worker。
執行任務
現在開啟第二個終端視窗並激活虛擬環境。 我將使用shell會話來啟動worker中的example()
任務:
>>> from redis import Redis
>>> import rq
>>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://'))
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.get_id()
'c651de7f-21a8-4068-afd5-8b982a6f6d32'
來自RQ的Queue
類表示從應用程式端看到的任務佇列。 它採用的引數是佇列名稱和一個Redis
連線物件,本處使用預設URL進行初始化。 如果你的Redis伺服器執行在不同的主機或埠號上,則需要使用其他URL。
Queue的enqueue()
方法用於將作業新增到佇列中。 第一個引數是要執行的任務的名稱,可直接傳入函式物件或匯入字串。 我發現傳入字串更加方便,因為不需要在應用程式的一端匯入函式。 對enqueue()
傳入的任何剩餘引數將被傳遞給worker中執行的函式。
只要進行了enqueue()
呼叫,執行著RQ worker的終端視窗上就會出現一些活動。 你會看到example()
函式正在執行,並且每秒列印一次計數器。 同時,你的其他終端不會被阻塞,你可以繼續在shell中執行表示式。在上面的例子中,我呼叫job.get_id()
方法來獲取分配給任務的唯一識別符號。 你可以嘗試使用另一個有趣表示式來檢查worker上的函式是否已完成:
>>> job.is_finished
False
如果你像我在上面的例子中那樣傳遞了23
,那麼函式將執行約23秒。 在那之後,job.is_finished
表示式將變為True
。 就是這麼簡單,炫酷否?
一旦函式完成,worker又回到等待作業的狀態,所以如果你想進行更多的實驗,你可以用不同的引數重複執行enqueue()
呼叫。 佇列中儲存的有關任務的資料將保留一段時間(預設為500秒),但最終會被刪除。 這很重要,任務佇列不保留已執行作業的歷史記錄。
報告任務進度
上面使用的示例任務簡單得不現實。 通常,對於長時間執行的任務,你需要將一些進度資訊提供給應用程式,從而可以將其顯示給使用者。 RQ通過使用作業物件的meta
屬性來支援這一點。 讓我重寫example()
任務來編寫進度報告:
app/tasks.py::帶進度的示例後臺任務。
import time
from rq import get_current_job
def example(seconds):
job = get_current_job()
print('Starting task')
for i in range(seconds):
job.meta['progress'] = 100.0 * i / seconds
job.save_meta()
print(i)
time.sleep(1)
job.meta['progress'] = 100
job.save_meta()
print('Task completed')
這個新版本的example()
使用RQ的get_current_job()
函式來獲取一個作業例項,該例項與提交任務時返回給應用程式的例項類似。 作業物件的meta
屬性是一個字典,任務可以編寫任何想要與應用程式通訊的自定義資料。 在這個例子中,我寫入了progress
,表示完成任務的百分比。 每次程序更新時,我都呼叫job.save_meta()
指示RQ將資料寫入Redis,應用程式可以在其中找到它。
在應用程式方面(目前只是一個Python shell),我可以執行此任務,然後監視進度,如下所示:
>>> job = queue.enqueue('app.tasks.example', 23)
>>> job.meta
{}
>>> job.refresh()
>>> job.meta
{'progress': 13.043478260869565}
>>> job.refresh()
>>> job.meta
{'progress': 69.56521739130434}
>>> job.refresh()
>>> job.meta
{'progress': 100}
>>> job.is_finished
True
如你所見,在另一側,meta
屬性可以被讀取。 需要呼叫refresh()
方法來從Redis更新內容。
任務的資料庫表示
對於上面的例子來說,啟動一個任務並觀察它執行就足夠了。 對於Web應用程式,情況會變得更復雜一些,因為一旦任務隨著請求的處理而啟動,該請求隨即結束,而該任務的所有上下文都將丟失。 因為我希望應用程式跟蹤每個使用者正在執行的任務,所以我需要使用資料庫表來維護狀態。 你可以在下面看到新的Task
模型實現:
app/models.py:Task模型。
# ...
import redis
import rq
class User(UserMixin, db.Model):
# ...
tasks = db.relationship('Task', backref='user', lazy='dynamic')
# ...
class Task(db.Model):
id = db.Column(db.String(36), primary_key=True)
name = db.Column(db.String(128), index=True)
description = db.Column(db.String(128))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
complete = db.Column(db.Boolean, default=False)
def get_rq_job(self):
try:
rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis)
except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError):
return None
return rq_job
def get_progress(self):
job = self.get_rq_job()
return job.meta.get('progress', 0) if job is not None else 100
這個模型和以前的模型有一個有趣的區別是id
主鍵欄位是字串型別,而不是整數型別。 這是因為對於這個模型,我不會依賴資料庫自己的主鍵生成,而是使用由RQ生成的作業識別符號。
該模型將儲存符合任務命名規範的名稱(會傳遞給RQ),適用於向用戶顯示的任務描述,該任務的所屬使用者的關係以及任務是否已完成的布林值。complete
欄位的目的是將正在執行的任務與已完成的任務分開,因為執行中的任務需要特殊處理才能顯示最新進度。
get_rq_job()
輔助方法可以用給定的任務ID載入RQJob
例項。 這是通過Job.fetch()
完成的,它會從Redis中存在的資料中載入Job
例項。 get_progress()
方法建立在get_rq_job()
的基礎之上,並返回任務的進度百分比。 該方法做一些有趣的假設,如果模型中的作業ID不存在於RQ佇列中,則表示作業已完成並且資料已過期並已從佇列中刪除,因此在這種情況下返回的百分比為100。 另一方面,如果job存在,但’meta’屬性中找不到進度相關的資訊,那麼可以安全地假定該job計劃執行,但還沒有啟動,所以在這種情況下進度是0。
要將更改應用於資料庫,需要生成新的遷移,然後升級資料庫:
(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade
新模型也可以新增到shell上下文中,以便在shell會話中訪問它時無需匯入:
microblog.py:新增Task模型到shell上下文中。
from app import create_app, db, cli
from app.models import User, Post, Message, Notification, Task
app = create_app()
cli.register(app)
@app.shell_context_processor
def make_shell_context():
return {'db': db, 'User': User, 'Post': Post, 'Message': Message,
'Notification': Notification, 'Task': Task}
將RQ與Flask應用整合
Redis服務的連線URL需要新增到配置中:
class Config(object):
# ...
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
與往常一樣,Redis連線URL將來自環境變數,如果該變數未定義,則會假定該服務在當前主機的預設埠上執行並使用預設URL。
應用工廠函式將負責初始化Redis和RQ:
app/init.py:整合RQ。
# ...
from redis import Redis
import rq
# ...
def create_app(config_class=Config):
# ...
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('microblog-tasks', connection=app.redis)
# ...
app.task_queue
將成為提交任務的佇列。 將佇列附加到應用上會提供很大的便利,因為我可以在應用的任何地方使用current_app.task_queue
來訪問它。 為了方便應用的任何部分提交或檢查任務,我可以在User
模型中建立一些輔助方法:
app/models.py:使用者模型中的任務輔助方法。
# ...
class User(UserMixin, db.Model):
# ...
def launch_task(self, name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id,
*args, **kwargs)
task = Task(id=rq_job.get_id(), name=name, description=description,
user=self)
db.session.add(task)
return task
def get_tasks_in_progress(self):
return Task.query.filter_by(user=self, complete=False).all()
def get_task_in_progress(self, name):
return Task.query.filter_by(name=name, user=self,
complete=False).first()
launch_task()
方法負責將任務提交到RQ佇列,並將其新增到資料庫中。 name
引數是函式名稱,如app/tasks.py中所定義的那樣。 提交給RQ時,該函式會將app.tasks.
預先新增到該名稱中以構建符合規範的函式名稱。description
引數是對呈現給使用者的任務的友好描述。 對於匯出使用者動態的函式,我將名稱設定為export_posts
,將描述設定為Exporting posts...
。 其餘引數將傳遞給任務函式。 launch_task()
函式首先呼叫佇列的enqueue()
方法來提交作業。 返回的作業物件包含由RQ分配的任務ID,因此我可以使用它在我的資料庫中建立相應的Task
物件。
請注意,launch_task()
將新的任務物件新增到會話中,但不會發出提交。 一般來說,最好在更高層次函式中的資料庫會話上進行操作,因為它允許你在單個事務中組合由較低級別函式所做的多個更新。 這不是一個嚴格的規則,並且,在本章後面的子函式中也會存在一個例外的提交。
get_tasks_in_progress()
方法返回該使用者未完成任務的列表。 稍後你會看到,我使用此方法在將有關正在執行的任務的資訊渲染到使用者的頁面中。
最後,get_task_in_progress()
是上一個方法的簡化版本並返回指定的任務。 我阻止使用者同時啟動兩個或多個相同型別的任務,因此在啟動任務之前,可以使用此方法來確定前一個任務是否還在執行。
利用RQ任務傳送電子郵件
不要認為本節偏離主題,我在上面說過,當後臺匯出任務完成時,將使用包含所有使用者動態的JSON檔案向用戶傳送電子郵件。 我在第十章中構建的電子郵件功能需要通過兩種方式進行擴充套件。 首先,我需要新增對檔案附件的支援,以便我可以附加JSON檔案。 其次,send_email()
函式總是使用後臺執行緒非同步傳送電子郵件。 當我要從後臺任務傳送一封電子郵件時(已經是非同步的了),基於執行緒的二級後臺任務沒有什麼意義,所以我需要同時支援同步和非同步電子郵件的傳送。
幸運的是,Flask-Mail支援附件,所以我需要做的就是擴充套件send_email()
函式的預設關鍵字引數,然後在Message
物件中配置它們。 選擇在前臺傳送電子郵件時,我只需要新增一個sync=True
的關鍵字引數即可:
app/email.py:傳送帶附件的郵件。
# ...
def send_email(subject, sender, recipients, text_body, html_body,
attachments=None, sync=False):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
if attachments:
for attachment in attachments:
msg.attach(*attachment)
if sync:
mail.send(msg)
else:
Thread(target=send_async_email,
args=(current_app._get_current_object(), msg)).start()
Message類的attach()
方法接受三個定義附件的引數:檔名,媒體型別和實際檔案資料。 檔名就是收件人看到的與附件關聯的名稱。 媒體型別定義了這種附件的型別,這有助於電子郵件讀者適當地渲染它。 例如,如果你傳送image/png
作為媒體型別,則電子郵件閱讀器會知道該附件是一個影象,在這種情況下,它可以顯示它。 對於使用者動態資料檔案,我將使用JSON格式,該格式使用application/json
媒體型別。 最後一個引數包含附件內容的字串或位元組序列。
簡單來說,send_email()
的attachments
引數將成為一個元組列表,每個元組將有三個元素對應於attach()
的三個引數。 因此,我需要將此列表中的每個元素作為引數傳送給attach()
。 在Python中,如果你想將列表或元組中的每個元素作為引數傳遞給函式,你可以使用func(*args)
將這個列表或元祖解包成函式中的多個引數,而不必枯燥地一個個地傳遞,如func(args[0], args[1], args[2])
。 例如,如果你有一個列表args = [1, 'foo']
,func(*args)
將會傳遞兩個引數,就和你呼叫func(1, 'foo')
一樣。 如果沒有*
,呼叫將會傳入一個引數,即args
列表。
至於電子郵件的同步傳送,我需要做的就是,當sync
是True
的時候恢復成呼叫mail.send(msg)
。
任務助手
儘管我上面使用的example()
任務是一個簡單的獨立函式,但匯出使用者動態的函式卻需要應用中具有的一些功能,例如訪問資料庫和傳送電子郵件。 因為這將在單獨的程序中執行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask應用例項以從中獲取它們的配置。 因此,我將在app/tasks.py模組的頂部新增Flask應用例項和應用上下文:
app/tasks.py:建立應用及其上下文。
from app import create_app
app = create_app()
app.app_context().push()
應用在此模組中建立,因為這是RQ worker要匯入的唯一模組。 當使用flask
命令時,根目錄中的microblog.py模組建立應用例項,但RQ worker對此卻一無所知,所以當任務函式需要它時,它需要建立自己的應用例項。 你已經在好幾個地方看到了app.app_context()
方法,推送一個上下文使應用成為“當前”的應用例項,這樣一來Flask-SQLAlchemy等外掛才可以使用current_app.config
獲取它們的配置。 沒有上下文,current_app
表示式會返回一個錯誤。
然後我開始考慮如何在這個函式執行時報告進度。除了通過job.meta
字典傳遞進度資訊之外,我還想將通知推送給客戶端,以便自動動態更新完成百分比。為此,我將使用我在第二十一章中構建的通知機制。更新將以與未讀訊息徽章非常類似的方式工作。當伺服器渲染模板時,它將包含從job.meta
獲得的“靜態”進度資訊,但是一旦頁面位於客戶端的瀏覽器中,通知將使用通知來動態更新百分比。由於通知的原因,更新正在執行的任務的進度將比上一個示例中的操作稍微多一些,所以我將建立一個專用於更新進度的包裝函式:
app/tasks.py:設定任務進度。
from rq import get_current_job
from app import db
from app.models import Task
# ...
def _set_task_progress(progress):
job = get_current_job()
if job:
job.meta['progress'] = progress
job.save_meta()
task = Task.query.get(job.get_id())
task.user.add_notification('task_progress', {'task_id': job.get_id(),
'progress': progress})
if progress >= 100:
task.complete = True
db.session.commit()
匯出任務可以呼叫_set_task_progress()
來記錄進度百分比。 該函式首先將百分比寫入job.meta
字典並將其儲存到Redis,然後從資料庫載入相應的任務物件,並使用task.user
已有的add_notification()
方法將通知推送給請求該任務的使用者。 通知將被命名為task_progress
,並且與其關聯的資料將成為具有兩個條目的字典:任務識別符號和進度數值。 稍後我將新增JavaScript程式碼來處理這種新的通知型別。
該函式檢視進度來確認任務函式是否已完成,並在這種情況下更新資料庫中任務物件的complete
屬性。 資料庫提交呼叫確保通過add_notification()
新增的任務和通知物件都立即儲存到資料庫。 我需要非常精確地設計父任務,確保不執行任何資料庫更改,因為執行本呼叫會將父任務的更改也寫入資料庫。
實現匯出任務
現在所有的準備工作已經完成,可以開始編寫匯出函數了。 這個函式的高層結構如下:
app/tasks.py:匯出使用者動態通用結構。
def export_posts(user_id):
try:
# read user posts from database
# send email with data to user
except:
# handle unexpected errors
為什麼將整個任務包裝在try/except塊中呢? 請求處理器中的應用程式碼可以防止意外錯誤,因為Flask本身捕獲異常,然後將它們以我設定的日誌配置的方式來進行處理。 然而,這個函式將執行在由RQ控制的單獨程序中,而非Flask,因此如果發生任何意外錯誤,任務將中止,RQ將向控制檯顯示錯誤,然後返回等待新的job。 所以基本上,除非你正在觀看RQ worker的輸出或將其記錄到檔案中,否則將永遠不會發現有錯誤。
讓我們從上面帶有註釋的三部分中最簡單的錯誤處理部分開始梳理:
app/tasks.py:匯出使用者動態錯誤處理。
import sys
# ...
def export_posts(user_id):
try:
# ...
except:
_set_task_progress(100)
app.logger.error('Unhandled exception', exc_info=sys.exc_info())
每當發生意外錯誤時,我將通過將進度設定為100%來將任務標記為完成,然後使用Flask應用程式中的日誌記錄器物件記錄錯誤以及堆疊跟蹤資訊(呼叫sys.exc_info()
來獲得)。 使用Flask應用日誌記錄器來記錄錯誤的好處在於,你可以觀察到你為Flask應用實現的任何日誌記錄機制。 例如,在第七章中,我配置了要傳送到管理員電子郵件地址的錯誤。 只要使用app.logger
,我也可以得到這些錯誤資訊。
接下來,我將編寫實際的匯出程式碼,它只需發出一個數據庫查詢並在迴圈中遍歷結果,並將它們累積在字典中:
app/tasks.py:從資料庫讀取使用者動態。
import time
from app.models import User, Post
# ...
def export_posts(user_id):
try:
user = User.query.get(user_id)
_set_task_progress(0)
data = []
i = 0
total_posts = user.posts.count()
for post in user.posts.order_by(Post.timestamp.asc()):
data.append({'body': post.body,
'timestamp': post.timestamp.isoformat() + 'Z'})
time.sleep(5)
i += 1
_set_task_progress(100 * i // total_posts)
# send email with data to user
except:
# ...
每條動態都是一個包含兩個條目的字典,即動態正文和動態發表的時間。 時間格式將採用ISO 8601標準。 我使用的Python的datetime
物件不儲存時區,因此在以ISO格式匯出時間後,我添加了’Z’,它表示UTC。
由於需要跟蹤進度,程式碼變得稍微複雜了些。 我維護了一個計數器i
,並且在進入迴圈之前還需要發出一個額外的資料庫查詢,查詢total_posts
以獲得使用者動態的總數。 使用了i
和total_posts
,在每個迴圈迭代我都可以使用從0到100的數字來更新任務進度。
你可能會好奇我為什麼會在每個迴圈迭代中加入time.sleep(5)
呼叫。主要原因是我想要延長匯出所需的時間,以便在使用者動態不多的情況下也可以方便地檢視到匯出進度的增長。
下面是函式的最後部分,將會帶上data
附件傳送郵件給使用者:
app/tasks.py:傳送帶使用者動態的郵件給使用者。
import json
from flask import render_template
from app.email import send_email
# ...
def export_posts(user_id):
try:
# ...
send_email('[Microblog] Your blog posts',
sender=app.config['ADMINS'][0], recipients=[user.email],
text_body=render_template('email/export_posts.txt', user=user),
html_body=render_template('email/export_posts.html', user=user),
attachments=[('posts.json', 'application/json',
json.dumps({'posts': data}, indent=4))],
sync=True)
except:
# ...
其實只是對send_email()
函式的呼叫。 附件被定義為一個元組,其中有三個元素被傳遞給Flask-Mail的Message
物件的attach()
方法。 元組中的第三個元素是附件內容,它是用Python的json.dumps()
函式生成的。
這裡引用了一對新模板,它們以純文字和HTML格式提供電子郵件正文的內容。 這是文字模板的內容:
app/templates/email/export_posts.txt:匯出使用者動態文字郵件模板。
Dear {{ user.username }},
Please find attached the archive of your posts that you requested.
Sincerely,
The Microblog Team
這是HTML版本的郵件模板:
app/templates/email/export_posts.html:匯出使用者動態HTML郵件模板。
<p>Dear {{ user.username }},</p>
<p>Please find attached the archive of your posts that you requested.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>
應用中的匯出功能
所有支援後臺匯出任務的核心元件現已到位。 剩下的就是將這個功能連線到應用,以便使用者發起請求並通過電子郵件傳送使用者動態給他們。
下面是新的export_posts
檢視函式:
app/main/routes.py:匯出使用者動態路由和檢視函式。
@bp.route('/export_posts')
@login_required
def export_posts():
if current_user.get_task_in_progress('export_posts'):
flash(_('An export task is currently in progress'))
else:
current_user.launch_task('export_posts', _('Exporting posts...'))
db.session.commit()
return redirect(url_for('main.user', username=current_user.username))
該函式首先檢查使用者是否有未完成的匯出任務,並在這種情況下只是閃現訊息。 對同一使用者同時執行兩個匯出任務是沒有意義的,可以避免。 我可以使用前面實現的get_task_in_progress()
方法來檢查這種情況。
如果使用者沒有正在執行的匯出任務,則呼叫launch_task()
來啟動它。 第一個引數是將傳遞給RQ worker的函式的名稱,字首為app.tasks.
。 第二個引數只是一個友好的文字描述,將會顯示給使用者。 這兩個值都會被寫入資料庫中的Task物件。 該函式以重定向到使用者個人主頁結束。
現在我需要暴露該路由的連結,以便使用者可以請求匯出。 我認為最合適的地方是在使用者個人主頁,只有在使用者檢視他們自己的主頁時,連結在“編輯個人資料”連結下面顯示:
app/templates/user.html:使用者個人主頁的匯出連結。
...
<p>
<a href="{{ url_for('main.edit_profile') }}">
{{ _('Edit your profile') }}
</a>
</p>
{% if not current_user.get_task_in_progress('export_posts') %}
<p>
<a href="{{ url_for('main.export_posts') }}">
{{ _('Export your posts') }}
</a>
</p>
...
{% endif %}
此連結的渲染是有條件的,因為我不希望它在使用者已經有匯出任務執行時出現。
此時的後臺作業是可以運作的,但是不會向用戶提供任何反饋。 如果你想嘗試一下,你可以按如下方式啟動應用和RQ worker:
- 確保Redis正在執行
- 開啟一個終端視窗,啟動至少一個RQ worker例項。本處你可以執行命令
rq worker microblog-tasks
- 再開啟另一個終端視窗,使用
flask run
(記得先設定FLASK_APP
變數)命令啟動Flask應用
進度通知
為了完善這個功能,我想在後臺任務執行時提醒使用者任務完成的百分比進度。 在瀏覽Bootstrap元件選項時,我決定在導航欄的下方使用一個Alert元件。 Alert元件是向用戶顯示資訊的帶顏色的橫條。 我用藍色的Alert框來渲染閃現的訊息。 現在我要新增一個綠色的Alert框來顯示任務進度。 樣式如下:
app/templates/base.html:基礎模板中的匯出進度Alert元件。
...
{% block content %}
<div class="container">
{% if current_user.is_authenticated %}
{% with tasks = current_user.get_tasks_in_progress() %}
{% if tasks %}
{% for task in tasks %}
<div class="alert alert-success" role="alert">
{{ task.description }}
<span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>%
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% endif %}
...
{% endblock %}
...
渲染任務Alert元件的方法幾乎與閃現訊息相同。 外部條件在使用者未登入時跳過所有與Alert相關的標記。而對於已登入使用者,我通過呼叫前面建立的get_tasks_in_progress()
方法來獲取當前正在進行的任務列表。 在當前版本的應用中,我最多隻能得到一個結果,因為我不允許多個匯出任務同時執行,但將來我可能要支援可以共存的其他型別的任務,所以以通用的方式渲染Alert可以節省我以後的時間。
對於每項任務,我都會在頁面上渲染一個Alert元素。 Alert的顏色由第二個CSS樣式控制,本處是alert-success
,而在閃現訊息是alert-info
。 Bootstrap文件包含有關Alert的HTML結構的詳細資訊。 Alert文字包括儲存在Task
模型中的description
欄位,後面跟著完成百分比。
百分比被封裝在具有id
屬性的<span>
元素中。 原因是我要在收到通知時用JavaScript重新整理百分比。 我給任務ID末尾附加-progress
來構造id
屬性。 當有通知到達時,通過其中的任務ID,我可以很容易地使用#<task.id>-progress
選擇器找到正確的<span>
元素來更新。
如果你此時進行嘗試,則每次導航到新頁面時都會看到“靜態”的進度更新。 你可以注意到,在啟動匯出任務後,你可以自由導航到應用程式的不同頁面,正在執行的任務的狀態始終都會展示出來。
為了對span>
元素的百分比的動態更新做準備,我將在JavaScript端編寫一個輔助函式:
app/templates/base.html:動態更新任務進度的輔助函式。
...
{% block scripts %}
...
<script>
...
function set_task_progress(task_id, progress) {
$('#' + task_id + '-progress').text(progress);
}
</script>
...
{% endblock %}
這個函式接受一個任務id
和一個進度值,並使用jQuery為這個任務定位<span>
元素,並將新進度作為其內容寫入。 實際上不需要驗證頁面上是否存在該元素,因為如果沒有找到該元素,jQuery將不會執行任何操作。
app/tasks.py中的_set_task_progress()
函式每次更新進度時呼叫add_notification()
,就會產生新的通知。 而我在第二十一章明智地以完全通用的方式實現了通知功能。 所以當瀏覽器定期向伺服器傳送通知更新請求時,瀏覽器會獲得通過add_notification()
方法新增的任何通知。
但是,這些JavaScript程式碼只能識別具有unread_message_count
名稱的那些通知,並忽略其餘部分。 我現在需要做的是擴充套件該函式,通過呼叫我上面定義的set_task_progress()
函式來處理task_progress
通知。 以下是處理通知更新版本JavaScript程式碼:
app/templates/base.html:通知處理器。
for (var i = 0; i < notifications.length; i++) {
switch (notifications[i].name) {
case 'unread_message_count':
set_message_count(notifications[i].data);
break;
case 'task_progress':
set_task_progress(
notifications[i].data.task_id,
notifications[i].data.progress);
break;
}
since = notifications[i].timestamp;
}
現在我需要處理兩個不同的通知,我決定用一個switch
語句替換檢查unread_message_count
通知名稱的if
語句,該語句包含我現在需要支援的每個通知。 如果你對“C”系列語言不熟悉,就可能從未見過switch語句,它提供了一種方便的語法,可以替代一長串的if/elseif
語句。這是一個很棒的特性,因為當我需要支援更多通知時,只需簡單地新增case
塊即可。
回顧一下,RQ任務附加到task_progress
通知的資料是一個包含兩個元素task_id
和progress
的字典,這兩個元素是我用來呼叫set_task_progress()
的兩個引數。
如果你現在執行該應用,則綠色Alert框中的進度指示器將每10秒重新整理一次(因為重新整理通知的時間間隔是10秒)。
由於本章介紹了新的可翻譯字串,因此需要更新翻譯檔案。 如果你要維護非英語語言檔案,則需要使用Flask-Babel重新整理翻譯檔案,然後新增新的翻譯:
(venv) $ flask translate update
如果你使用的是西班牙語翻譯,那麼我已經為你完成了翻譯工作,因此可以從下載包中提取app/translations/es/LC_MESSAGES/messages.po檔案,並將其新增到你的專案中。
翻譯檔案到位後,還要編譯翻譯檔案:
(venv) $ flask translate compile
部署注意事項
為了完成本章,我還要討論應用程式部署的變化。 為了支援後臺任務,我在部署棧中增加了兩個新元件,一個Redis伺服器和一/多個RQ worker。 很明顯,它們需要包含在部署策略中,因此我將簡要介紹前幾章中不同部署方式的一些調整。
部署到Linux伺服器
如果你正在Linux伺服器上執行應用,則新增Redis十分簡單。 對於Ubuntu Linux,你可以執行sudo apt-get install redis-server
來安裝Redis伺服器。
要執行RQ worker程序,可以按照第十七章中“設定Gunicorn和Supervisor”一節那樣建立第二個Supervisor配置,在其中執行的命令改成rq worker microblog-tasks
。 如果你想要執行多個worker(假設是生產環境),則可以使用Supervisor的numprocs
指令來指示要同時執行多少個例項。
部署到Heroku
要在Heroku上部署應用,你需要將Redis服務新增到你的帳戶。 這與我新增Postgres資料庫的過程類似。 Redis也有一個免費檔次,可以使用以下命令新增:
$ heroku addons:create heroku-redis:hobby-dev
新的redis服務的訪問URL將作為REDIS_URL
變數新增到你的Heroku環境中,這正是應用所需的。
Heroku的免費方案允許同時啟動一個web程序和一個worker程序,因此你可以在免費的情況下啟動一個rq
worker程序。 為此,你將需要在procfile的一個單獨的行中宣告worker:
web: flask db upgrade; flask translate compile; gunicorn microblog:app
worker: rq worker microblog-tasks
將這些變更重新部署之後,可以使用以下命令啟動worker:
$ heroku ps:scale worker=1
部署到Docker
如果你將應用程式部署到Docker容器,那麼首先需要建立一個Redis容器。 為此,你可以使用Docker映象倉庫中的其中一個官方Redis映象:
$ docker run --name redis -d -p 6379:6379 redis:3-alpine
當執行你的應用時,你需要以類似於MySQL容器的連結方式,連結redis容器並設定REDIS_URL
環境變數。 下面是一個完整的命令來啟動應用,包含了一個redis連結:
$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \
-e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
-e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
--link mysql:dbserver --link redis:redis-server \
-e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
-e REDIS_URL=redis://redis-server:6379/0 \
microblog:latest
最後,你需要為RQ worker執行一/多個容器。 由於worker與主應用具有相同的程式碼,因此可以使用與應用相同的容器映象,並覆蓋啟動命令,以便啟動worker而不是Web應用。 以下是啟動worker的docker run
命令:
$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \
-e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \
-e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \
--link mysql:dbserver --link redis:redis-server \
-e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \
-e REDIS_URL=redis://redis-server:6379/0 \
--entrypoint venv/bin/rq \
microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks
覆蓋Docker映象的預設啟動命令有點棘手,因為命令需要分兩部分給出。 --entrypoint
引數只取得可執行檔案的名稱,但是引數(如果有的話)需要在映象和標籤之後,也就是在命令列的結尾處給出。 請注意rq
命令需要使用venv/bin/rq
,以便在沒有手動啟用虛擬環境的情況下,也能識別虛擬環境並正常工作。