python-執行緒池-程序池
阿新 • • 發佈:2020-08-28
執行緒池程序池 concurrent.futures
- 什麼是池
- 要在程式開始的時候,還沒提交任務先建立幾個執行緒或者程序放在一個池子裡,這就是池
- 為什麼要用池
- 如果先開好程序/執行緒,那麼有任務之後就可以直接使用這個池中的資料了
- 並且開好的執行緒或者程序會一直存在在池中,可以被多個任務反覆利用。這樣極大的減少了開啟\關閉\排程執行緒/程序的時間開銷
- 池中的執行緒/程序個數控制了作業系統需要排程的任務個數,控制池中的單位,有利於跳作業系統的效率,減輕作業系統的負擔
concurrent.futures模組
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
ThreadPoolExecutor
執行緒池 ProcessPoolExecutor
程序池
- 例項化建立池 和 向池中提交任務:submit
- 執行緒池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread def func(): print(current_thread().ident) t_pool = ThreadPoolExecutor(4) # 執行緒池中執行緒的數量為4 for i in range(20): t_pool.submit(func) # submit非同步提交任務
import time, random from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import current_thread def func(): print(current_thread().ident, 'start') time.sleep(random.randint(1, 4)) print(current_thread().ident, 'end') t_pool = ThreadPoolExecutor(4) # 執行緒池中執行緒的數量為4 for i in range(20): t_pool.submit(func) # submit非同步提交任務
- 傳引數(可以位置傳參,關鍵字傳參)
import time, random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread
def func(name,name2):
print(current_thread().ident, name)
time.sleep(random.randint(1, 4))
print(current_thread().ident, name2)
t_pool = ThreadPoolExecutor(4) # 執行緒池中執行緒的數量為4
for i in range(20):
t_pool.submit(func, 'alex', 'taibai') # submit非同步提交任務
- 開啟程序池(程序池不要忘了加
if __name__ = '__main__':
)
import time, random
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def func(name,name2):
print(os.getpid(), name) # 當前程序pid
time.sleep(random.randint(1, 4))
print(os.getpid(), name2)
if __name__ == '__main__': # 記得加
t_pool = ProcessPoolExecutor(4) # 程序池中程序的數量為4
for i in range(20):
t_pool.submit(func, 'alex', 'taibai') # submit非同步提交任務
- 獲取任務結果
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random
def func(a, b):
print(os.getpid(), 'start', a, b)
time.sleep(random.randint(1, 4))
print(os.getpid(), 'end')
return a+b
if __name__ == '__main__':
tp = ProcessPoolExecutor(4)
future_l = {}
for i in range(20): # 非同步非阻塞的
ret = tp.submit(func, i, i+1)
future_l[i] = ret
# print(ret.result()) # 變成串行了
for key in future_l: # 同步阻塞的
print(key, future_l[key].result())
- map(只適合傳遞簡單的引數,並且必須是一個可迭代的資料型別作為引數)
# 效果和上面獲取任務結果一樣,只是少了個for迴圈
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random
def func(a):
b = a+1
print(os.getpid(), 'start', a, b)
time.sleep(random.randint(1, 4))
print(os.getpid(), 'end')
return a+b
if __name__ == '__main__':
tp = ProcessPoolExecutor(4)
ret = tp.map(func, range(20))
for key in ret: # 同步阻塞
print(key)
- 回撥函式
ret.add_done_callback(函式)
(效率最高)
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import random
def func(a, b):
print(current_thread().ident, 'start', a, b)
time.sleep(random.randint(1, 4))
print(current_thread().ident, 'end')
return a+b # 可以做個標識 return (a, a+b)
def print_func(ret): # 非同步阻塞
print(ret.result())
if __name__ == '__main__':
tp = ThreadPoolExecutor(4) # 開四個執行緒
for i in range(20): # 非同步非阻塞
ret = tp.submit(func, i, i+1) # 往執行緒池中提交任務
ret.add_done_callback(print_func) # 回撥函式
# ret這個任務再執行完畢的瞬間立即觸發print_func函式,並且把任務的返回值物件傳遞到print_func做引數
# 非同步阻塞 回撥函式 給ret物件繫結一個回撥函式,等待ret對應的任務有了結果之後立即呼叫print_func這個函式,就可以對結果立即進行處理,而不用按照順序接收處理結果。。非同步阻塞
- shutdown
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os
import random
def func(a, b):
print(os.getpid(), 'start', a, b)
time.sleep(random.randint(1, 4))
print(os.getpid(), 'end')
return a+b
if __name__ == '__main__':
tp = ProcessPoolExecutor(4)
future_l = {}
for i in range(20): # 非同步非阻塞的
ret = tp.submit(func, i, i+1)
future_l[i] = ret
tp.shutdown() # 關閉執行緒池,等待執行緒池中所有的任務執行完畢
# print(ret.result()) # 變成串行了
for key in future_l: # 同步阻塞的
print(key, future_l[key].result())