1. 程式人生 > >理解Python併發程式設計-PoolExecutor篇

理解Python併發程式設計-PoolExecutor篇

之前我們使用多執行緒(threading)和多程序(multiprocessing)完成常規的需求,在啟動的時候start、jon等步驟不能省,複雜的需要還要用1-2個佇列。隨著需求越來越複雜,如果沒有良好的設計和抽象這部分的功能層次,程式碼量越多除錯的難度就越大。有沒有什麼好的方法把這些步驟抽象一下呢,讓我們不關注這些細節,輕裝上陣呢?

答案是:有的

從Python3.2開始一個叫做concurrent.futures被納入了標準庫,而在Python2它屬於第三方的futures庫,需要手動安裝:

1234567891011121314151617181920212223242526 ❯ pip install futures``` 這個模組中有2個類:ThreadPoolExecutor和ProcessPoolExecutor,也就是對threading和multiprocessing的進行了高級別的抽象,暴露出統一的介面,幫助開發者非常方便的實現非同步呼叫:```pythonimport timefrom concurrent.futures import ProcessPoolExecutor, as_completedNUMBERS = range(25, 38)def fib
(n):
if n<= 2: return 1 return fib(n-1) + fib(n-2)start = time.time()with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)print 'COST: {}'.format(time.time() - start)

感受下是不是很輕便呢?看一下花費的時間:

123456789101112131415 ❯ python fib_executor.pyfib(25) = 75025fib(26) = 121393fib(27) = 196418fib(28) = 317811fib(29) = 514229fib(30) = 832040fib(31) = 1346269fib(32) = 2178309fib(33) = 3524578fib(34) = 5702887fib(35) = 9227465fib(36) = 14930352fib(37) = 24157817COST: 10.8920350075

除了用map,另外一個常用的方法是submit。如果你要提交的任務的函式是一樣的,就可以簡化成map。但是假如提交的任務函式是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接丟擲錯誤)就要用到submit:

12345678910111213141516171819202122232425262728 from concurrent.futures import ThreadPoolExecutor, as_completedNUMBERS = range(30, 35)def fib(n): if n == 34: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2)with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print 'raise an exception: {}'.format(e) else: print 'fib({}) = {}'.format(num, result)with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)

執一下:

12345678910111213141516171819202122 ❯ python fib_executor_with_raise.pyfib(30) = 832040fib(31) = 1346269raise an exception: Don't do thisfib(32) = 2178309fib(33) = 3524578Traceback (most recent call last): File "fib_executor_with_raise.py", line 28, in <module> for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map yield future.result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result return self.__get_result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result reraise(self._exception, self._traceback) File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise exec('raise exc_type, exc_value, traceback', {}, locals_) File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run result = self.fn(*self.args, **self.kwargs) File "fib_executor_with_raise.py", line 9, in fib raise Exception("Don't do this")Exception: Don't do this

可以看到,第一次捕捉到了異常,但是第二次執行的時候錯誤直接丟擲來了。

上面說到的map,有些同學馬上會說,這不是程序(執行緒)池的效果嗎?看起來確實是的:

1234567891011121314151617181920 import timefrom multiprocessing.pool import PoolNUMBERS = range(25, 38)def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2)start = time.time()pool = Pool(3)results = pool.map(fib, NUMBERS)for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)print 'COST: {}'.format(time.time() - start)

好像程式碼量更小喲。好吧,看一下花費的時間:

123456789101112131415 ❯ python fib_pool.pyfib(25) = 75025fib(26) = 121393fib(27) = 196418fib(28) = 317811fib(29) = 514229fib(30) = 832040fib(31) = 1346269fib(32) = 2178309fib(33) = 3524578fib(34) = 5702887fib(35) = 9227465fib(36) = 14930352fib(37) = 24157817COST: 17.1342718601

WhatTF竟然花費了1.7倍的時間。為什麼?

BTW,有興趣的同學可以對比下ThreadPool和ThreadPoolExecutor,由於GIL的緣故,對比的差距一定會更多。

原理

我們就拿ProcessPoolExecutor介紹下它的原理,引用官方程式碼註釋中的流程圖:

123456789101112131415161718 |======================= In-process =====================|== Out-of-process ==|+----------+ +----------+ +--------+ +-----------+ +---------+| | => | Work Ids | => | | => | Call Q | => | || | +----------+ | | +-----------+ | || | | ... | | | | ... | | || | | 6 | | | | 5, call() | | || | | 7 | | | | ... | | || Process | | ... | | Local | +-----------+ | Process || Pool | +----------+ | Worker | | #1..n || Executor | | Thread | | || | +----------- + | | +-----------+ | || | <=> | Work Items | <=> | | <= | Result Q | <= | || | +------------+ | | +-----------+ | || | | 6: call() | | | | ... | | || | | future | | | | 4, result | | || | | ... | | | | 3, except | | |+----------+ +------------+ +--------+ +-----------+ +---------+

我們結合原始碼和上面的資料流分析一下:

  1. executor.map會建立多個_WorkItem物件,每個物件都傳入了新建立的一個Future物件。
  2. 把每個_WorkItem物件然後放進一個叫做「Work Items」的dict中,鍵是不同的「Work Ids」。
  3. 建立一個管理「Work Ids」佇列的執行緒「Local worker thread」,它能做2件事:
    1. 從「Work Ids」佇列中獲取Work Id, 通過「Work Items」找到對應的_WorkItem。如果這個Item被取消了,就從「Work Items」裡面把它刪掉,否則重新打包成一個_CallItem放入「Call Q」這個佇列。executor的那些程序會從佇列中取_CallItem執行,並把結果封裝成_ResultItems放入「Result Q」佇列中。
    2. 從「Result Q」佇列中獲取_ResultItems,然後從「Work Items」更新對應的Future物件並刪掉入口。

看起來就是一個「生產者/消費者」模型罷了,錯了。我們要注意,整個過程並不是多個程序與任務+結果-2個佇列直接通訊的,而是通過一箇中間的「Local worker thread」,它就是讓效率提升的重要原因之一!!!

設想,當某一段程式提交了一個請求,期望得到一個答覆。但服務程式對這個請求可能很慢,在傳統的單執行緒環境下,呼叫函式是同步的,也就是說它必須等到服務程式返回結果後,才能進行其他處理。而在Future模式下,呼叫方式改為非同步,而原先等待返回的時間段,在主呼叫函式中,則可用於處理其他事物。

Future

Future是常見的一種併發設計模式,在多個其他語言中都可以見到這種解決方案。

一個Future物件代表了一些尚未就緒(完成)的結果,在「將來」的某個時間就緒了之後就可以獲取到這個結果。比如上面的例子,我們期望併發的執行一些引數不同的fib函式,獲取全部的結果。傳統模式就是在等待queue.get返回結果,這個是同步模式,而在Future模式下,呼叫方式改為非同步,而原先等待返回的時間段,由於「Local worker thread」的存在,這個時候可以完成其他工作

在tornado中也有對應的實現。2013年的時候,我曾經寫過一篇部落格使用tornado讓你的請求非同步非阻塞,最後也提到了用concurrent.futures實現非同步非阻塞的完成耗時任務。

原文連結:http://www.dongwm.com/archives/使用Python進行併發程式設計-PoolExecutor篇/