[python] ThreadPoolExecutor執行緒池和ProcessPoolExecutor程序池
引言
Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。
Executor和Future
concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來建立執行緒池和程序池的程式碼。我們可以將相應的tasks直接放入執行緒池/程序池,不需要維護Queue來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。
Future這個概念相信有java和nodejs下程式設計經驗的朋友肯定不陌生了,你可以把它理解為一個在未來完成的操作,這是非同步程式設計的基礎,傳統程式設計模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。關於在Python中進行非同步IO可以閱讀完本文之後參考我的Python併發程式設計之協程/非同步IO。
p.s: 如果你依然在堅守Python2.x,請先安裝futures模組。
pip install futures
ProcessPoolExecutor(n):n表示池裡面存放多少個程序,之後的連線最大就是n的值 submit(fn,*args,**kwargs) 非同步提交任務 map(func, *iterables, timeout=None, chunksize=1) 取代for迴圈submit的操作 shutdown(wait=True) 相當於程序池的pool.close()+pool.join()操作 wait=True,等待池內所有任務執行完畢回收完資源後才繼續,--------》預設 wait=False,立即返回,並不會等待池內的任務執行完畢 但不管wait引數為何值,整個程式都會等到所有任務執行完畢 submit和map必須在shutdown之前 result(timeout=None) #取得結果 add_done_callback(fn) #回撥函式
使用submit來操作執行緒池/程序池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time #模擬網路請求的網路延遲 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times #建立一個大小為2的執行緒池 pool = ThreadPoolExecutor(max_workers=2) #將上個任務提交到執行緒池,因為執行緒池的大小是2,所以必須等task1和task2中有一個完成之後才會將第三個任務提交到執行緒池 task1 = pool.submit(get_html,3) task2 = pool.submit(get_html,2) task3 = pool.submit(get_html,4) #列印該任務是否執行完畢 print(task1.done()) #只有未被提交的到執行緒池(在等待提交的佇列中)的任務才能夠取消 print(task3.cancel()) time.sleep(4)#休眠4秒鐘之後,執行緒池中的任務全部執行完畢,可以列印狀態 print(task1.done()) print(task1.result())#該任務的return 返回值 該方法是阻塞的。
- ThreadPoolExecutor構造例項的時候,傳入max_workers引數來設定執行緒池中最多能同時執行的執行緒數目。
- 使用submit函式來提交執行緒需要執行的任務(函式名和引數)到執行緒池中,並返回該任務的控制代碼(類似於檔案、畫圖),注意submit()不是阻塞的,而是立即返回。
- 通過submit函式返回的任務控制代碼,能夠使用done()方法判斷該任務是否結束。上面的例子可以看出,由於任務有2s的延時,在task1提交後立刻判斷,task1還未完成,而在延時4s之後判斷,task1就完成了。
- 使用cancel()方法可以取消提交的任務,如果任務已經線上程池中運行了,就取消不了。這個例子中,執行緒池的大小設定為2,任務已經在運行了,所以取消失敗。如果改變執行緒池的大小為1,那麼先提交的是task1,task2還在排隊等候,這是時候就可以成功取消。
- 使用result()方法可以獲取任務的返回值。檢視內部程式碼,發現這個方法是阻塞的。
as_completed
上面雖然提供了判斷任務是否結束的方法,但是不能在主執行緒中一直判斷啊。有時候我們是得知某個任務結束了,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。
pool = ThreadPoolExecutor(max_workers=2)
urls = [2,3,4]
all_task = [pool.submit(get_html,url) for url in urls]
for future in as_completed(all_task):
data = future.result()
print("in main: get page {}s success".format(data))
#echo
# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success
as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for迴圈下面的語句,然後繼續阻塞住,迴圈到所有的任務結束。從結果也可以看出,先完成的任務會先通知主執行緒。
map
除了上面的as_completed方法,還可以使用executor.map方法,但是有一點不同。
from concurrent.futures import ThreadPoolExecutor
import time
# 引數times用來模擬網路請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
for data in executor.map(get_html, urls):
print("in main: get page {}s success".format(data))
#echo
#執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success
wait
wait方法可以讓主執行緒阻塞,直到滿足設定的要求。
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
# 引數times用來模擬網路請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
#echo
# 執行結果
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main
wait方法接收3個引數,等待的任務序列、超時時間以及等待條件。等待條件return_when預設為ALL_COMPLETED,表明要等待所有的任務都結束。可以看到執行結果中,確實是所有任務都完成了,主執行緒才打印出main。等待條件還可以設定為FIRST_COMPLETED,表示第一個任務完成就停止等待。
ProcessPoolExecutor使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os
def task(n):
print('%s is running'% os.getpid())
time.sleep(random.randint(1,3))
return n
def handle(res):
res=res.result()
print("handle res %s"%res)
if __name__ == '__main__':
#同步呼叫
# pool=ProcessPoolExecutor(8)
#
# for i in range(13):
# pool.submit(task, i).result() #變成同步呼叫,串行了,等待結果
# # pool.shutdown(wait=True) #關門等待所有程序完成
# pool.shutdown(wait=False)#預設wait就等於True
# # pool.submit(task,3333) #shutdown後不能使用submit命令
#
# print('主')
#非同步呼叫
pool=ProcessPoolExecutor(8)
for i in range(13):
obj=pool.submit(task,i)
obj.add_done_callback(handle) #這裡用到了回撥函式
pool.shutdown(wait=True) #關門等待所有程序完成
print('主')
##注意,建立程序池必須在if __name__ == '__main__':中,否則會報錯
##其他的用法和建立執行緒池的一樣
from concurrent.futures import ThreadPoolExecutor
from urllib import request
from threading import current_thread
import time
def get(url):
print('%s get %s'%(current_thread().getName(),url))
response=request.urlopen(url)
time.sleep(2)
# print(response.read().decode('utf-8'))
return{'url':url,'content':response.read().decode('utf-8')}
def parse(res):
res=res.result()
print('parse:[%s] res:[%s]'%(res['url'],len(res['content'])))
# get('http://www.baidu.com')
if __name__ == '__main__':
pool=ThreadPoolExecutor(2)
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://www.openstack.org',
'https://www.openstack.org',
'https://www.openstack.org',
'https://www.openstack.org',
'https://www.openstack.org',
]
for url in urls:
pool.submit(get,url).add_done_callback(parse)