建立程序池與執行緒池concurrent.futures模組的使用
阿新 • • 發佈:2019-09-21
一、程序池。
當併發的任務數量遠遠大於計算機所能承受的範圍,即無法一次性開啟過多的任務數量就應該考慮去 限制程序數或執行緒數,從而保證伺服器不會因超載而癱瘓。這時候就出現了程序池和執行緒池。
二、concurrent.futures模組介紹
concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor:程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class
三、基本方法:
submit(fn, *args, **kwargs)
:非同步提交任務
map(func, *iterables, timeout=None, chunksize=1)
:取代for迴圈submit的操作
shutdown(wait=True)
:相當於程序池的pool.close()+pool.join()
操作
- wait=True,等待池內所有任務執行完畢回收完資源後才繼續
- wait=False,立即返回,並不會等待池內的任務執行完畢
- 但不管wait引數為何值,整個程式都會等到所有任務執行完畢
- submit和map必須在shutdown之前
result(timeout=None)
add_done_callback(fn)
:回撥函式
done()
:判斷某一個執行緒是否完成
cancle()
:取消某個任務
四、程序池程式碼例項——ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor from multiprocessing import current_process import time def func(i): print(f'程序 {current_process().name} 正在執行任務 {i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 程序池只有4個程序 lt = [] for i in range(20): # 假設執行20個任務 future = pool.submit(func,i) # func任務要做20次,4個程序負責完成這個20個任務 # print(future.result()) # 如果沒有結果就一直等待拿到結果,導致了所有任務都在序列 lt.append(future) pool.shutdown() # 預設為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞, for fu in lt: print(fu.result()) # 等待所有的任務都執行完了,一起把返回值打印出來
五、執行緒池程式碼示例——ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time
def func(i):
print(f'執行緒 {currentThread().name} 正在執行任務 {i}')
time.sleep(1)
return i**2
if __name__ == '__main__':
fool = ThreadPoolExecutor(4) # 執行緒池裡只有4個執行緒
lt = []
for i in range(20):
futrue = fool.submit(func,i) # func任務要做20次,4個執行緒負責完成這20次任務
lt.append(futrue)
fool.shutdown() # 預設為True,關閉了池的入口,會等待所有的任務執行完,結束阻塞,
for fu in lt:
print(fu.result()) # 等待所有的任務都執行完了,一起把返回值打印出來
六、回撥函式add_done_callback(fn)
提交任務的兩種方式:
同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行程式碼
非同步: 提交了一個任務,不要等執行完了,可以直接執行下一行程式碼。
ps:程序和執行緒回撥方法的使用寫一塊了,註釋掉的是程序的使用。
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time
def task(i):
print(f'執行緒 {currentThread().name} 正在執行任務 {i}')
# print(f'程序 {current_process().name} 正在執行任務 {i}')
time.sleep(1)
return i**2
def parse(futrue):
# 處理拿到的結果
print(futrue.result())
if __name__ == '__main__':
pool = ThreadPoolExecutor(4) # 執行緒池裡只有4個執行緒
# pool = ProcessPoolExecutor(4) # 程序池裡只有4個程序
lt = []
for i in range(20):
futrue = pool.submit(task,i) # task任務要做20次,分別由四個程序完成這20個任務
futrue.add_done_callback(parse)
# 為當前任務繫結一個函式,在當前任務執行結束的時候會觸發這個函式
# 會把futrue物件作為引數傳給函式
# 這個稱之為回撥函式,處理完了回來就呼叫這個函式。
跟上面執行緒池裡的例子相比:回撥函式的作用,不需要等待所有的任務執行完才打印返回值。每執行完一個任務直接列印結果,實現一個併發的效果,效率有所提升