1. 程式人生 > >利用Queue實現的Flask下的資源池

利用Queue實現的Flask下的資源池

最近在開發flask網站程式的時候,遇到一個資源竟態分配的問題,嘗試使用python中的Queue來解決

1 業務場景

在我們的業務中,我們提供了很多臺裝置可以供使用者連結使用,使用者通過介面來申請裝置使用,在資料庫中維護了裝置的狀態,可用、忙碌和不可用,分配的策略是從資料庫中取出可用的裝置列表,然後通過呼叫一個測試連結函式來確認是否實際可用,如果確實可用則把該裝置標記為忙碌,然後把裝置資訊返回給使用者,使用者根據資訊連結該裝置進行操作。程式使用Flask框架,結合sqlalchemy元件,第一版本程式碼結構大體如下:

#從資料庫獲取所有的可用裝置
idle_devices = Device.query.filter_by(state=DEVICE_STATE_IDLE).all()
#尋找第一個可用裝置分配給使用者
for device in idle_devices: if device_available(device): idle_device = device break #沒有可用裝置返回空 if idle_device is None: return jsonify(BaseApi.api_success("")) #更新裝置表 idle_device.state = DEVICE_STATE_BUSY db.session.add(idle_device) return idle_device

2 問題

上面這個版本的程式碼在剛上線的時候就出問題了,裝置表中只有一臺可用裝置時,卻讓兩個使用者都分配到了裝置資源,也就是說裝置被重複分配了,通過觀察程式碼不難發現,在多使用者訪問的情況下,上面的程式碼是有問題的,從資料庫中取到的可用裝置,在真正分配的時候可能就不可用了。所以這個寫法是有問題的。另外除了資源重複分配的問題,在每次使用者申請的時候都要取出所有的可用裝置挨個遍歷可用,這種在裝置數量很大的時候,是不可取的。嘗試使用改進方案。

3 python Queue

Python中,佇列是執行緒間最常用的交換資料的形式,Queue模組是提供佇列操作的模組。

3.1 建立佇列

import Queue
q = Queue.Queue(maxsize = 10)

Queue.Queue類即是一個佇列的同步實現。佇列長度可為無限或者有限。可通過Queue的建構函式的可選引數maxsize來設定佇列長度。如果maxsize小於1就表示佇列長度無限。

3.2 插入值

q.put(item)

呼叫佇列物件的put()方法在隊尾插入一個專案。put()有兩個引數,第一個item為必需的,為插入專案的值;第二個block為可選引數,預設為1。如果隊列當前為空且block為1,put()方法就使呼叫執行緒暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。

3.3 從佇列中取值

q.get()

呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

4 Flask的上下文環境

下表展示了Flask上下文環境的四種變數。

變數名 上下文 說明
current_app 程式上下文 當前啟用程式的程式例項
g 程式上下文 處理請求時用作臨時儲存的物件。每次請求都會重設這個變數
request 請求上下文 請求物件,封裝了客戶端發出的HTTP 請求中的內容
session 請求上下文 使用者會話,用於儲存請求之間需要“記住”的值的詞典

在這裡如果想做跨請求的資料共享,唯一適合使用的是current_app。

5 解決方案

結合Queue的特點和Flask的上下文環境,可以設計如下的解決方案,在應用啟動的時候,從MYSQL中讀出所有可用裝置的id,新增到一個cuerrent_app範圍的的Queue中,然後在每次裝置分配的時候從佇列中讀取一個裝置id,如果該id對應的裝置不可用繼續讀取,直到讀到可用裝置或者佇列為空,當每次使用者釋放裝置的時候再將裝置id新增到佇列中。我在實際編碼中使用Flask的請求鉤子函式,定義了一個在所有請求到來之前執行的函式來初始化該佇列程式碼如下:

@api.before_app_first_request
def init_queue():
    idle_devices = Device.query.filter_by(state=DEVICE_STATE_IDLE).all()
    q = Queue.Queue(0)
    for device in idle_devices:
        q.put(device.id)
    app.devices = q

6 結果

經過這樣的改進之後,沒有再出現重複分配的問題了,同時也一定程度上提高了程式執行的效率。

當然這個程式有一個潛在的問題,就是隻能支援單程序執行,如果同時起了多個程序還是會出現資源重複分配的問題,這裡後續可以考慮使用Redis 佇列來替代python的Queue模組。