1. 程式人生 > 實用技巧 >Flask 流式響應

Flask 流式響應

背景

在 web 場景下,經常會碰到下載檔案的需求,通常小檔案我們會採用 Flasksend_file或者send_from_directory的方式,下載,但是當下載的檔案是一個大壓縮檔案(>1GiB)時,這種方式就顯得不友好了,我們需要採用流式下載的方式返回給客戶端。

流式下載

簡單實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from flask import Response


def (file_path):
def generate():
if not os.path.exists(file_path):
raise "File not found."
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size=10 * 1024 * 1024)
if not chunk:
break
yield chunk

return Response(generate(), content_type="application/octet-stream")
```

執行 Flask app,可以正確下載檔案,但是下載只有實時速度,沒有檔案總大小,導致無法知道下載進度,也沒有檔案型別,這些我們都可以通過增加 header 欄位實現:
```python
response = Response(generate(), mimetype='application/gzip')
response.headers['Content-Disposition'] = 'attachment; filename={}.tar.gz'.format("download_file")
response.headers['content-length'] = os.stat(str(file_path)).st_size
return response

這樣,我們下載檔案就可以看到檔案型別、檔案總大小及已下載大小了,其中mimetype根據實際壓縮檔案型別修改匹配即可。


轉發流式下載

當我們下載本地節點檔案,可以通過上述方法實現,但是如果我們的產品是叢集形式的,要求在叢集中的任一節點均可下載叢集中所有節點的指定檔案,我們就需要支援將流式下載轉發並實時下載,避免訪問節點佔用太多記憶體。

如果是單節點轉發流式請求,我們可以通過 flask 的stream_with_context實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from flask import (
Flask,
Response,
stream_with_context
)
import requests

app = Flask(__name__)

@app.route("/download/<file_path>", method=["GET"])
def (file_path):
url_prefix = "http://1.1.1.1/"
remote_url = url_prefix + file_path
req = requests.get(remote_url, stream = True)
return Response(stream_with_context(req.iter_content()),
content_type = req.headers['content-type'])

if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True)

在我們訪問http://localhost:5000/download/file_name時,通過 requests 訪問遠端節點 1.1.1.1 的地址,並將請求通過流式的方式轉發至客戶端,實現下載。

如果是轉發多節點流式請求,我們該如何保證多個請求最終 merge 後是一個正確的檔案呢?
通過查詢資料,排除了標準庫中的 tarfile 和 zipfile 打包壓縮方式,最終採用 zipstream(https://github.com/allanlei/python-zipstream)第三方庫實現。


zipstream 支援通過迭代器的方式寫入檔案,並可實時壓縮讀取,官方示例如下:

1
2
3
4
5
6
7
8
9
10
def iterable():
for _ in xrange(10):
yield b'this is a byte stringx01n'

z = zipstream.ZipFile()
z.write_iter('my_archive_iter', iterable())

with open('zipfile.zip', 'wb') as f:
for data in z:
f.write(data)


根據上述特性,我們結合轉發單節點請求,實現同時請求多節點並實時壓縮下載:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@app.route("/cluster_download/<file_path>", method=["GET"])
def cluster_download(reqs):
def generate(req):
z = zipstream.ZipFile(mode="w", compression=zipstream.ZIP_DEFLATED)
for req in reqs:
host = req.raw._fp.fp._sock.getpeername()[0]
z.write_iter("%s.tar.gz" % host, req.iter_content(chunk_size=10 * 1024 * 1024)
for chunk in z:
yield chunk

def get_file_size(reqs):
size = 0
for req in reqs:
size += int(req.headers.get("content-length"))
return size

remote_hosts = ["1.1.1.1", "2.2.2.2"]
reqs = []
for host in remote_hosts:
req = requests.get("http://%s/%s" % (host, file_path), timeout=5, stream=True)
if req.status_code == 200:
reqs.append(req)
response = Response(generate(reqs))
response.headers['mimetype'] = 'application/zip'
response.headers['Content-Disposition'] = 'attachment; filename=cluster_logs.zip)
response.hreads['content-length'] = get_file_size(reqs)

當我們訪問http://localhost/cluster_download/file_name時,會先去 remote_hosts 中各個節點下載該檔案,並通過write_iter的方式寫入到 zip 檔案中,Flask Response 返回的是 zip 檔案中的資料塊。


如果我們要在 zip 檔案中增加某些執行過程中產生的資料,我們可以通過再定義一個生成器的方式:

1
2
3
4
def generate_file(content):
yield content

z.write_iter("running_status", generate_file)

這樣我們就可以在最終的 zip 檔案中,包含一個名為running_status的檔案,檔案內容為 content 的內容。

總結

這個需求在日常使用中是很常見的,跟下載類似,上傳檔案的話我們也可以採用類似的方式實現。