python 併發 非同步 下載
阿新 • • 發佈:2019-01-04
下面使用了concurrent.future : 併發入門
例子中的ThreadPoolExecutor可以改成ProcessPoolExecutor 試試看,介面統一
另外ProcessPoolExecutor 的引數預設值:os.cpu_count()
map , submit, as_completed的引數及返回.參考文件
分別使用了map 以及 submit 和 as_completed完成下載;
注意: as_completed 返回的future是完成或失敗的.正在執行的不會返回;
可以修改max_workers引數看看變化
首先是map方法 . 如果你修改了我的程式碼去迭代map的返回值需要注意 迭代過程中會呼叫future.result() 因此會阻塞,
而我的程式碼中會阻塞的原因是在with 塊中,最後會呼叫executor.shutdown,這個函式會等待所有的執行緒完成或失敗
:
from concurrent import futures import requests,time,sys,os #常量 FLAGS = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' #下載url DEST_DIR = 'downloads/' #儲存目錄 CHUNK_SIZE = 8192 #塊大小 MAX_THREAD = 20 #併發最多20個執行緒 """ 使用requests 來請求下載; 對於下載小檔案不需要使用iter_content, 直接使用requests.get(url).content即可. 這個例子適用了下載大檔案的情況. requests庫的使用可以參考: http://docs.python-requests.org/zh_CN/latest/user/quickstart.html """ #下載主體 def begin_download(flag): path = os.path.join(DEST_DIR, flag.lower()+'.gif') print('開始下載:%s,儲存位置:%s'%(flag,path),end='\n') sys.stdout.flush() url = '{}/{flag}/{flag}.gif'.format(BASE_URL, flag=flag.lower()) #以下程式碼如果做測試可以改成:open(path,'wb').write(requests.get(url)) 即可 with requests.get(url,stream=True) as resp: #請求下載,stream=True流下載. with open(path,'wb') as fd: #開啟檔案 for chunk in resp.iter_content(CHUNK_SIZE): #分塊請求檔案流 fd.write(chunk) #寫檔案 print('%s done!'%flag);return flag
#計算了檔案下載時間 def t_download(): start_time = time.time() #多執行緒下載,每個url分配一個執行緒 with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex: iter_res = ex.map(begin_download,FLAGS) elapsed = time.time() - start_time print('all done :{}s'.format(elapsed)) if __name__ == '__main__': os.makedirs(DEST_DIR,exist_ok=True) t_download()
接下來是submit (返回一個future)和as_completed(接受一個future列表,返回一個生成器,哪個任務先完成就返回其future),
只把上面的 t_download 修改一下:
def t1_download():
start_time = time.time()
future_tasks = []
results = []
with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex:
print('準備新增執行緒..')
for flag in FLAGS:
future_tasks.append(ex.submit(begin_download,flag))
print('執行緒新增完畢..')
for f in futures.as_completed(future_tasks):
try:
res = f.result()
except Exception as e:
print('%s下載失敗, except:%s'%(res,e))
else:
results.append(res)
elapsed = time.time() - start_time
print('總共完成->%d<-個任務'%len(results))
print('all done :{}s'.format(elapsed))
下面添加了進度條, 使用tqdm.基本程式碼都沒變:
def t1_download():
start_time = time.time()
future_tasks = []
results = []
with futures.ThreadPoolExecutor(max_workers=min(len(FLAGS),MAX_THREAD)) as ex:
for flag in FLAGS:
future_tasks.append(ex.submit(begin_download,flag))
done_iter = futures.as_completed(future_tasks)
done_iter = tqdm.tqdm(done_iter,total=len(future_tasks))
for future in done_iter:
try:
res = future.result()
except Exception as e:
print('%s'%e)
else:
results.append(res)
elapsed = time.time() - start_time
print('總共完成->%d<-個任務'%len(results))
print('all done :{}s'.format(elapsed))
下面使用了非同步下載:
import os,sys,time,asyncio,aiohttp
FLAGS = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags' #下載url
DEST_DIR = 'downloads/' #儲存目錄
async def fetch(session:aiohttp.ClientSession,url:str,path:str,flag:str):
print(flag, ' 開始下載')
async with session.get(url) as resp:
with open(path,'wb') as fd:
while 1:
chunk = await resp.content.read(8196)
if not chunk:
break
fd.write(chunk)
return flag
async def download():
tasks = []
async with aiohttp.ClientSession() as session:
for flag in FLAGS:
path = os.path.join(DEST_DIR, flag.lower() + '.gif')
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=flag.lower())
tasks.append(asyncio.ensure_future(fetch(session, url, path, flag)))
await asyncio.wait(tasks)
# for coroutine in asyncio.as_completed(tasks):
# res = await coroutine
# print('%s下載完成' % res)
os.makedirs(DEST_DIR,exist_ok=True)
lp = asyncio.get_event_loop()
start = time.time()
lp.run_until_complete(download())
end = time.time()
lp.close()
print('耗時:',end-start)