1. 程式人生 > >python 併發 非同步 下載

python 併發 非同步 下載

 下面使用了concurrent.future : 併發入門

 例子中的ThreadPoolExecutor可以改成ProcessPoolExecutor 試試看,介面統一

 另外ProcessPoolExecutor 的引數預設值:os.cpu_count()

 map , submit, as_completed的引數及返回.參考文件

 分別使用了map 以及 submit 和 as_completed完成下載;

注意: as_completed 返回的future是完成或失敗的.正在執行的不會返回;

可以修改max_workers引數看看變化

首先是map方法 . 如果你修改了我的程式碼去迭代map的返回值需要注意 迭代過程中會呼叫future.result() 因此會阻塞,

而我的程式碼中會阻塞的原因是在with 塊中,最後會呼叫executor.shutdown,這個函式會等待所有的執行緒完成或失敗

:

from concurrent import futures
import requests,time,sys,os

#常量
FLAGS = ('CN IN US ID BR PK NG BD RU JP ' 
         'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'        #下載url
DEST_DIR = 'downloads/'                           #儲存目錄
CHUNK_SIZE = 8192                                  #塊大小
MAX_THREAD = 20                                    #併發最多20個執行緒

"""
    使用requests 來請求下載;
    對於下載小檔案不需要使用iter_content,
    直接使用requests.get(url).content即可.
    這個例子適用了下載大檔案的情況.
    requests庫的使用可以參考:
    http://docs.python-requests.org/zh_CN/latest/user/quickstart.html
"""
#下載主體
def begin_download(flag):
    path = os.path.join(DEST_DIR, flag.lower()+'.gif')
    print('開始下載:%s,儲存位置:%s'%(flag,path),end='\n')
    sys.stdout.flush()
    url = '{}/{flag}/{flag}.gif'.format(BASE_URL, flag=flag.lower())
    #以下程式碼如果做測試可以改成:open(path,'wb').write(requests.get(url)) 即可
    with requests.get(url,stream=True) as resp:              #請求下載,stream=True流下載.
        with open(path,'wb') as fd:                          #開啟檔案
            for chunk in resp.iter_content(CHUNK_SIZE):      #分塊請求檔案流
                fd.write(chunk)                               #寫檔案
    print('%s done!'%flag);return flag

#計算了檔案下載時間
def t_download():
    start_time = time.time()
    #多執行緒下載,每個url分配一個執行緒
    with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex:
        iter_res = ex.map(begin_download,FLAGS)
    elapsed = time.time() - start_time
    print('all done :{}s'.format(elapsed))
if __name__ == '__main__':
    os.makedirs(DEST_DIR,exist_ok=True)
    t_download()

接下來是submit (返回一個future)和as_completed(接受一個future列表,返回一個生成器,哪個任務先完成就返回其future),

只把上面的 t_download     修改一下:

def t1_download():
    start_time = time.time()
    future_tasks = []
    results = []
    with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex:
        print('準備新增執行緒..')
        for flag in FLAGS:
            future_tasks.append(ex.submit(begin_download,flag))
        print('執行緒新增完畢..')
        for f in futures.as_completed(future_tasks):
            try:
                res = f.result()
            except Exception as e:
                print('%s下載失敗, except:%s'%(res,e))
            else:
                results.append(res)
    elapsed = time.time() - start_time
    print('總共完成->%d<-個任務'%len(results))
    print('all done :{}s'.format(elapsed))

下面添加了進度條, 使用tqdm.基本程式碼都沒變:

def t1_download():
    start_time = time.time()
    future_tasks = []
    results = []
    with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex:
        for flag in FLAGS:
            future_tasks.append(ex.submit(begin_download,flag))
        done_iter = futures.as_completed(future_tasks)
        done_iter = tqdm.tqdm(done_iter,total=len(future_tasks))
        for future in done_iter:
            try:
                res = future.result()
            except Exception as e:
                print('%s'%e)
            else:
                results.append(res)
    elapsed = time.time() - start_time
    print('總共完成->%d<-個任務'%len(results))
    print('all done :{}s'.format(elapsed))

下面使用了非同步下載:

import os,sys,time,asyncio,aiohttp

FLAGS = ('CN IN US ID BR PK NG BD RU JP ' 
         'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'        #下載url
DEST_DIR = 'downloads/'                           #儲存目錄

async def fetch(session:aiohttp.ClientSession,url:str,path:str,flag:str):
    print(flag, ' 開始下載')
    async with session.get(url) as resp:
        with open(path,'wb') as fd:
            while 1:
                chunk = await resp.content.read(8196)
                if not chunk:
                    break
                fd.write(chunk)
    return flag

async def download():
    tasks = []
    async with aiohttp.ClientSession() as session:
        for flag in FLAGS:
            path = os.path.join(DEST_DIR, flag.lower() + '.gif')
            url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=flag.lower())
            tasks.append(asyncio.ensure_future(fetch(session, url, path, flag)))
        await asyncio.wait(tasks)
        # for coroutine in asyncio.as_completed(tasks):
        #     res = await coroutine
        #     print('%s下載完成' % res)

os.makedirs(DEST_DIR,exist_ok=True)
lp = asyncio.get_event_loop()
start = time.time()
lp.run_until_complete(download())
end = time.time()
lp.close()
print('耗時:',end-start)