concurrent.futures效能
python因為其全域性直譯器鎖GIL而無法通過執行緒實現真正的平行計算。這個論斷我們不展開,但是有個概念我們要說明,IO密集型 vs. 計算密集型。
IO密集型:讀取檔案,讀取網路套接字頻繁。
計算密集型:大量消耗CPU的數學與邏輯運算,也就是我們這裡說的平行計算。
而concurrent.futures模組,可以利用multiprocessing實現真正的平行計算。
核心原理是:concurrent.futures會以子程序的形式,平行的執行多個python直譯器,從而令python程式可以利用多核CPU來提升執行速度。由於子程序與主直譯器相分離,所以他們的全域性直譯器鎖也是相互獨立的。每個子程序都能夠完整的使用一個CPU核心。
第一章 concurrent.futures效能闡述
- 最大公約數
這個函式是一個計算密集型的函式。
# -*- coding:utf-8 -*- # 求最大公約數 def gcd(pair): a, b = pair low = min(a, b) for i in range(low, 0, -1): if a % i == 0 and b % i == 0: return i numbers = [ (1963309, 2265973), (1879675, 2493670), (2030677, 3814172), (1551645, 2229620), (1988912, 4736670), (2198964, 7876293) ]
- 不使用多執行緒/多程序
import time start = time.time() results = list(map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 2.507 seconds.
消耗時間是:2.507。
- 多執行緒ThreadPoolExecutor
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() pool = ThreadPoolExecutor(max_workers=2) results = list(pool.map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 2.840 seconds.
消耗時間是:2.840。
上面說過gcd是一個計算密集型函式,因為GIL的原因,多執行緒是無法提升效率的。同時,執行緒啟動的時候,有一定的開銷,與執行緒池進行通訊,也會有開銷,所以這個程式使用了多執行緒反而更慢了。
- 多程序ProcessPoolExecutor
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() pool = ProcessPoolExecutor(max_workers=2) results = list(pool.map(gcd, numbers)) end = time.time() print 'Took %.3f seconds.' % (end - start) Took 1.861 seconds.
消耗時間:1.861。
在兩個CPU核心的機器上執行多程序程式,比其他兩個版本都快。這是因為,ProcessPoolExecutor類會利用multiprocessing模組所提供的底層機制,完成下列操作:
1)把numbers列表中的每一項輸入資料都傳給map。
2)用pickle模組對資料進行序列化,將其變成二進位制形式。
3)通過本地套接字,將序列化之後的資料從煮直譯器所在的程序,傳送到子直譯器所在的程序。
4)在子程序中,用pickle對二進位制資料進行反序列化,將其還原成python物件。
5)引入包含gcd函式的python模組。
6)各個子程序並行的對各自的輸入資料進行計算。
7)對執行的結果進行序列化操作,將其轉變成位元組。
8)將這些位元組通過socket複製到主程序之中。
9)主程序對這些位元組執行反序列化操作,將其還原成python物件。
10)最後,把每個子程序所求出的計算結果合併到一份列表之中,並返回給呼叫者。
multiprocessing開銷比較大,原因就在於:主程序和子程序之間通訊,必須進行序列化和反序列化的操作。
第二章 concurrent.futures原始碼分析
- Executor
可以任務Executor是一個抽象類,提供瞭如下抽象方法submit,map(上面已經使用過),shutdown。值得一提的是Executor實現了__enter__和__exit__使得其物件可以使用with操作符。關於上下文管理和with操作符詳細請參看這篇部落格http://www.cnblogs.com/kangoroo/p/7627167.html
ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來建立執行緒池和程序池的程式碼。
class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" def submit(self, fn, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable. Returns: A Future representing the given call. """ raise NotImplementedError() def map(self, fn, *iterables, **kwargs): """Returns a iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order. Raises: TimeoutError: If the entire result iterator could not be generated before the given timeout. Exception: If fn(*args) raises for any values. """ timeout = kwargs.get('timeout') if timeout is not None: end_time = timeout + time.time() fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): try: for future in fs: if timeout is None: yield future.result() else: yield future.result(end_time - time.time()) finally: for future in fs: future.cancel() return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other methods can be called after this one. Args: wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. """ pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False
下面我們以執行緒ProcessPoolExecutor的方式說明其中的各個方法。
- map
map(self, fn, *iterables, **kwargs)
map方法的例項我們上面已經實現過,值得注意的是,返回的results列表是有序的,順序和*iterables迭代器的順序一致。
這裡我們使用with操作符,使得當任務執行完成之後,自動執行shutdown函式,而無需編寫相關釋放程式碼。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: results = list(pool.map(gcd, numbers)) print 'results: %s' % results end = time.time() print 'Took %.3f seconds.' % (end - start)
產出結果是:
results: [1, 5, 1, 5, 2, 3] Took 1.617 seconds.
- submit
submit(self, fn, *args, **kwargs)
submit方法用於提交一個可並行的方法,submit方法同時返回一個future例項。
future物件標識這個執行緒/程序非同步進行,並在未來的某個時間執行完成。future例項表示執行緒/程序狀態的回撥。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor start = time.time() futures = list() with ProcessPoolExecutor(max_workers=2) as pool: for pair in numbers: future = pool.submit(gcd, pair) futures.append(future) print 'results: %s' % [future.result() for future in futures] end = time.time() print 'Took %.3f seconds.' % (end - start)
產出結果是:
results: [1, 5, 1, 5, 2, 3] Took 2.289 seconds.
- future
submit函式返回future物件,future提供了跟蹤任務執行狀態的方法。比如判斷任務是否執行中future.running(),判斷任務是否執行完成future.done()等等。
as_completed方法傳入futures迭代器和timeout兩個引數
預設timeout=None,阻塞等待任務執行完成,並返回執行完成的future物件迭代器,迭代器是通過yield實現的。
timeout>0,等待timeout時間,如果timeout時間到仍有任務未能完成,不再執行並丟擲異常TimeoutError
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: futures = [ pool.submit(gcd, pair) for pair in numbers] for future in futures: print '執行中:%s, 已完成:%s' % (future.running(), future.done()) print '#### 分界線 ####' for future in as_completed(futures, timeout=2): print '執行中:%s, 已完成:%s' % (future.running(), future.done()) end = time.time() print 'Took %.3f seconds.' % (end - start)
- wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。
使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,預設設定為ALL_COMPLETED。
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION start = time.time() with ProcessPoolExecutor(max_workers=2) as pool: futures = [ pool.submit(gcd, pair) for pair in numbers] for future in futures: print '執行中:%s, 已完成:%s' % (future.running(), future.done()) print '#### 分界線 ####' done, unfinished = wait(futures, timeout=2, return_when=ALL_COMPLETED) for d in done: print '執行中:%s, 已完成:%s' % (d.running(), d.done()) print d.result() end = time.time() print 'Took %.3f seconds.' % (end - start)
由於設定了ALL_COMPLETED,所以wait等待所有的task執行完成,可以看到6個任務都執行完成了。
執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:False, 已完成:False 執行中:False, 已完成:False #### 分界線 #### 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True 執行中:False, 已完成:True Took 1.518 seconds.
如果我們將配置改為FIRST_COMPLETED,wait會等待直到第一個任務執行完成,返回當時所有執行成功的任務。這裡並沒有做併發控制。
重跑,結構如下,可以看到執行了2個任務。
執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:True, 已完成:False 執行中:False, 已完成:False 執行中:False, 已完成:False #### 分界線 #### 執行中:False, 已完成:True 執行中:False, 已完成:True Took 1.517 seconds.