concurrent.futures進行併發程式設計
Python中進行併發程式設計一般使用threading和multiprocessing模組,不過大部分的併發程式設計任務都是派生一系列執行緒,從佇列中收集資源,然後用佇列收集結果。在這些任務中,往往需要生成執行緒池,concurrent.futures模組對threading和multiprocessing模組進行了進一步的包裝,可以很方便地實現池的功能。
下載
python3中concurrent.futures是標準庫,在python2中還需要自己安裝futures:
pip install futures
Executor與Future
concurrent.futures供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,都繼承自Executor,分別被用來建立執行緒池和程序池,接受max_workers引數,代表建立的執行緒數或者程序數。ProcessPoolExecutor的max_workers引數可以為空,程式會自動建立基於電腦cpu數目的程序數。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import requests def load_url(url): return requests.get(url) url = 'http://httpbin.org' executor = ThreadPoolExecutor(max_workers=1) future = executor.submit(load_url, url)
Executor中定義了submit()方法,這個方法的作用是提交一個可執行的回撥task,並返回一個future例項。future能夠使用done()方法判斷該任務是否結束,done()方法是不阻塞的,使用result()方法可以獲取任務的返回值,這個方法是阻塞的。
print future.done() print future.result().status_code
submit()方法只能進行單個任務,用併發多個任務,需要使用map與as_completed。
map
URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): return requests.get(url) with ThreadPoolExecutor(max_workers=3) as executor: for url, data in zip(URLS, executor.map(load_url, URLS)): print('%r page status_code %s' % (url, data.status_code))
結果:
'http://httpbin.org' page status_code 200 'http://example.com/' page status_code 200 'https://api.github.com/' page status_code 200
map方法接收兩個引數,第一個為要執行的函式,第二個為一個序列,會對序列中的每個元素都執行這個函式,返回值為執行結果組成的生成器。
由上面可以看出返回結果與序列結果的順序是一致的
as_completed
as_completed()方法返回一個Future組成的生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,直到所有的任務結束。
def load_url(url): return url, requests.get(url).status_code with ThreadPoolExecutor(max_workers=3) as executor: tasks = [executor.submit(load_url, url) for url in URLS] for future in as_completed(tasks): print future.result()
結果:
('http://example.com/', 200) ('http://httpbin.org', 200) ('https://api.github.com/', 200)
可以看出,結果與序列順序不一致,先完成的任務會先通知主執行緒。
wait
wait方法可以讓主執行緒阻塞,直到滿足設定的要求。有三種條件ALL_COMPLETED, FIRST_COMPLETED,FIRST_EXCEPTION。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED from concurrent.futures import as_completed import requests URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] def load_url(url): requests.get(url) print url with ThreadPoolExecutor(max_workers=3) as executor: tasks = [executor.submit(load_url, url) for url in URLS] wait(tasks, return_when=ALL_COMPLETED) print 'all_cone'
返回:
http://example.com/ http://httpbin.org https://api.github.com/ all_cone
可以看出阻塞到任務全部完成。
ProcessPoolExecutor
使用ProcessPoolExecutor與ThreadPoolExecutor方法基本一致,注意文件中有一句:
The __main__
module must be importable by worker subprocesses. This means that ProcessPoolExecutor
will not work in the interactive interpreter.
需要__main__模組。
def main(): with ProcessPoolExecutor() as executor: tasks = [executor.submit(load_url, url) for url in URLS] for f in as_completed(tasks): ret = f.done() if ret: print f.result().status_code if __name__ == '__main__': main()