1. 程式人生 > 實用技巧 >urllib結合 concurrent.futures 多執行緒下載檔案。

urllib結合 concurrent.futures 多執行緒下載檔案。

示例:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/12/16 10:42
#  @Author:zhangmingda
#  @File: urllib_multi_download.py
#  @Software: PyCharm
#  Description: 使用urllib 模組 實現多執行緒下載某個檔案測試

from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.request import
Request from urllib.request import quote import json import math import os class DownLoader(object): def __init__(self): self.part_size = 1024 * 1024 * 10 # 分塊下載大小 self.part_thread_num = 10 self.BUFFER_SIZE = 64 * 1024 def download_part(self, encode_url, part_filename, offset, end_bytes):
""" :param encode_url:經過URL編碼的網路地址 :param part_filename: 檔案塊兒名字 :param offset: 下載位元組起始點(包含) :param end_bytes: 下載位元組結束點(包含) :return: (下載結果) """ # 構造請求頭 range_header = { 'Range': 'bytes=%s-%s' % (offset, end_bytes) }
print(range_header) cur_task_ret = False expected_file_size = end_bytes - offset + 1 part_req = Request(encode_url,headers=range_header) with open(part_filename, 'wb') as local_part_fd: with urlopen(part_req) as req_fd: while True: # 一直從網路讀資料 data = req_fd.read(self.BUFFER_SIZE) if not data: break local_part_fd.write(data) if expected_file_size == os.stat(part_filename).st_size: print('%s 與預期塊兒檔案大小相符' % part_filename) cur_task_ret = True # break else: print('%s 與預期塊兒檔案大小 不符,預期%s位元組,實際得到%s 位元組' % ( part_filename, expected_file_size, os.stat(part_filename).st_size)) return {part_filename: cur_task_ret} def download(self, url): finally_filename = os.path.basename(url) # 將URL編碼成%字串格式 encode_url = quote(url, safe=";/?:@&=+$,") print(encode_url) # 構造請求 req = Request(encode_url) # 發起請求並且獲取內容長度 with urlopen(req) as fp: # print(json.dumps(dir(fp),indent=1)) print(fp.getheaders()) # length = fp.getheader('content-Range') length = fp.getheader('Content-Length') length = int(length) print(type(length)) print('length:', length) # 分塊任務列表 thread_list = [] # 每個塊兒下載的結果 multi_chunk_download_result = {} chunk_size = self.part_size # 計算需要下載的塊兒個數 chunk_count = int(math.ceil(length / float(chunk_size))) pool_args_list = [] # 計算每個塊兒請求的位元組範圍 for i in range(chunk_count): offset = chunk_size * i end_bytes = min(chunk_size * (i + 1), length) - 1 # 將一個檔案劃分的所有塊兒任務,新增到任務列表 part_num = i + 1 part_filename = finally_filename + '.' + str(part_num) # 每個塊兒請求的範圍,塊兒名字,加到執行緒引數列表 pool_args_list.append((encode_url, part_filename, offset, end_bytes)) # ********開始多執行緒下載資料,並獲取下載結果************** # 構建執行緒池例項 tp = ThreadPoolExecutor(max_workers=self.part_thread_num) # 全部新增到任務佇列開始處理 [thread_list.append(tp.submit(self.download_part, *args)) for args in pool_args_list] # 等待所有執行緒結束,獲取全部執行緒的執行結果 [multi_chunk_download_result.update(part_thread.result()) for part_thread in as_completed(thread_list)] # 下載總結 print('下載總結') # 如果任務數和塊兒數對不上,報一下出入 if len(multi_chunk_download_result) != chunk_count: raise RuntimeError( "%s part miss,expect=%d,actual=%d" % (finally_filename, chunk_count, len(multi_chunk_download_result))) # 如果任務都完畢,檢查是否有失敗的塊兒 for item in multi_chunk_download_result.keys(): if not multi_chunk_download_result[item]: raise RuntimeError("%s part upload has fail" % item) # 都OK 整合檔案 with open(finally_filename, 'wb') as local_fd: for i in range(chunk_count): part_filename = finally_filename + '.' + str(i + 1) with open(part_filename, 'rb') as part_fd: while True: bytes_data = part_fd.read(self.BUFFER_SIZE) if not bytes_data: break local_fd.write(bytes_data) if length == os.stat(finally_filename).st_size: print('%s 下載完成,檔案大小相符' % finally_filename) for part_filename in multi_chunk_download_result.keys(): os.remove(part_filename) else: print('%s 下載完成,但大小不符,content_length:%s 下載後大小 %s' % (finally_filename, length,os.stat(finally_filename).st_size )) if __name__ == '__main__': downloader = DownLoader() url = 'https://ks3-cn-beijing.ksyun.com/zhangmingda/111-3333333.Python安裝與命令列操作.mp4' print(url) downloader.download(url)