concurrent.futures- 啟動並行任務
阿新 • • 發佈:2018-11-10
python因為其全域性直譯器鎖GIL而無法通過執行緒實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,IO密集型 vs. 計算密集型。
IO密集型:讀取檔案,讀取網路套接字頻繁。
計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這裡說的平行計算。
而concurrent.futures模組,可以利用multiprocessing實現真正的平行計算。
核心原理是:concurrent.futures會以子程序的形式,平行的執行多個python直譯器,從而令python程式可以利用多核CPU來提升執行速度。由於子程序與主直譯器相分離,所以他們的全域性直譯器鎖也是相互獨立的。每個子程序都能夠完整的使用一個CPU核心。
一、初體驗
Future總結
1. python3自帶,python2需要安裝 2. Executer物件 它是一個抽象類,它提供了非同步執行的方法,他不能直接使用,但可以通過它的子類 ThreadPoolExecuter和ProcessPoolExecuter 2.1 Executer.submit(fn,*args,**kwargs) fn:需要非同步執行的函式 *args,**kwargs fn接受的引數 該方法的作用就是提交一個可執行的回撥task,它返回一個Future物件 2.2 map(fn,*iterables, timeout=None, chunksize=1) map(task,URLS)# 返回一個map()迭代器,這個迭代器中的回撥執行返回的結果是有序的 3. Future物件相關 future可以理解為一個在未來完成的操作,這是非同步程式設計的基礎 通常情況下我們在遇到IO操作的時候,將會發生阻塞,cpu不能做其他事情 而future的引入幫助我們在這段等待時間可以完成其他的操作 3.1 done(): 如果當前執行緒已取消/已成功,返回True。 3.2 cance(): 如果當前執行緒正在執行,並且不能取消呼叫,返回Flase。否則呼叫取消,返回True 3.3 running(): 如果當前的執行緒正在執行,則返回True3.4 result(): 返回呼叫返回的值,如果呼叫尚未完成,則此方法等待 如果等待超時,會丟擲concurrent.futures.TimeoutError 如果沒有指定超時時間,則等待無時間限制 如果在完成之前,取消了Future,則會引發CancelledError 4. as_completed(): 在多個Future例項上的迭代器將會被返回 這些Future例項由fs完成時產生。 由fs返回的任何重複的Future,都會被返回一次。 裡面儲存的都是已經執行完成的Future物件 5. wait(): 返回一個元祖,元祖包含兩個元素 1. 已完成的future集合 2. 未完成的future集合
初體驗
# coding=utf-8 from concurrent import futures from concurrent.futures import Future import time def return_future(msg): time.sleep(3) return msg pool = futures.ThreadPoolExecutor(max_workers=2) t1 = pool.submit(return_future,'hello') t2 = pool.submit(return_future,'world') time.sleep(3) print(t1.done()) # 如果順利完成,則返回True time.sleep(3) print(t2.done()) print(t1.result()) # 獲取future的返回值 time.sleep(3) print(t2.result()) print("主執行緒")
map
(func,* iterables,timeout = None,chunksize = 1 )
# coding=utf-8 import time from concurrent.futures import Future,as_completed from concurrent.futures import ThreadPoolExecutor as Pool import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) pool = Pool() result = pool.map(task,URLS) start_time = time.time() # 按照URLS的順序返回 for res in result: print("{} {}".format(res.url,len(res.content))) # 無序的 with Pool(max_workers=3) as executer: future_task = [executer.submit(task,url) for url in URLS] for f in as_completed(future_task): if f.done(): f_ret = f.result() # f.result()得到task的返回值,requests物件 print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) print("耗時",time.time() - start_time) print("主執行緒")
二、Future物件
Future可以理解為一個未來完成的操作
當我們執行io操作的時候,在等待返回結果之前會產生阻塞
cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他操作
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) # start_time = time.time() # for url in URLS: # ret = task(url) # print("{} {}".format(ret.url,len(ret.content))) # print("耗時",time.time() - start_time) with Pool(max_workers=3) as executor: # 建立future任務 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e) """ url不是按照順序返回的,說明併發時,當訪問某一個url時,如果沒有得到返回結果,不會發生阻塞 <Future at 0x1c63990e6d8 state=running> is running <Future at 0x1c639922780 state=running> is running <Future at 0x1c639922d30 state=running> is running <Future at 0x1c63990e6d8 state=finished returned Response>, done, result: http://www.baidu.com/, 2381 <Future at 0x1c639922780 state=finished returned Response>, done, result: https://www.qq.com?fromdefault, 243101 <Future at 0x1c639922d30 state=finished returned Response>, done, result: http://sina.com/, 23103 """
三、模組方法
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)
wait()會返回一個tuple,
tuple會包含兩個集合
1. 已完成的集合
2. 未完成的集合
使用wait()會獲得更大的自由度,他接受三個引數
FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE
預設為ALL_COMPLETE
from concurrent.futures import Future from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed,wait import requests URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) with Pool(max_workers=3) as execute : fulture_task = [execute.submit(task,url) for url in URLS] for f in fulture_task: if f.running(): print("%s"%(str(f))) """ 並且wait還有timeout和return_when兩個引數 return_when有三個常量 FIRST_COMPLETED 任何一個future_task執行完成時/取消時,改函式返回 FIRST_EXCEPTION 任何一個future_task發生異常時,該函式返回,如果沒有異常發生,等同於ALL_COMPLETED ALL_COMPLETED 當所有的future_task執行完畢返回 """ results = wait(fulture_task,return_when="FIRST_COMPLETED")# done = results[0] for d in done: print(d)
concurrent.futures.
as_completed
(fs, timeout=None)
在多個Future例項上的迭代器將會被返回
這些Future例項由fs完成時產生。
由fs返回的任何重複的Future,都會被返回一次。
裡面儲存的都是已經執行完成的Future物件
from concurrent.futures import ThreadPoolExecutor as Pool from concurrent.futures import as_completed import requests import time URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com'] def task(url,timeout=10): return requests.get(url=url,timeout=timeout) with Pool(max_workers=3) as executor: # 建立future任務 future_task = [executor.submit(task,url) for url in URLS] for f in future_task: if f.running(): print("%s is running"%str(f)) for f in as_completed(future_task): try: ret = f.done() if ret: f_ret = f.result() print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content))) except Exception as e: f.cance() print(e)