1. 程式人生 > >threadpool和Queue

threadpool和Queue

tps 任務 保持 pools 狀態 queue some call 回收

線程池可以看作容納線程的容器,一個應用程序最多只能有一個線程池。

ThreadPool中的線程不用手動開始,也不能手動取消,你要做的只是把工作函數排入線程池,剩下的工作將由系統自動完成,也就是說我們不能控制線程池中的線程。如果想對線程進行更多的控制,那就不適合使用線程池。在以下情況中不宜使用ThreadPool類而應該使用單獨的Thread類:

  1.線程執行需要很長時間;

  2.需要為線程指定詳細的優先級;

  3.在執行過程中需要對線程進行操作,比如睡眠,掛起等

所以ThreadPool適合於並發運行若幹個運行時間不長且互不幹擾的函數。

ThreadPool的作用:

現在我們逐步向線程池中排入100個任務,看看線程池中的線程數目是如何變化的。從而加深對線程池的理解。為了敘述方便,我們假設下限位5,上限位25.

技術分享圖片

  1.當線程池被創建後,裏面就會創建5個空線程(和下限值相同)

  2.當我們向線程池中排入一個任務後,就會有一個空線程接手該任務,然後運行起來。隨著我們不斷向線程池中排入任務,線程池中的空線程逐一接手任務並被執行。

  3.隨著任務的增加,在某一時刻任務數量會超出下限。當任務請求數量超出下限時,線程池並不會立即創建新線程,而是等待大約500ms左右,這麽做的目的時看看在這段時間內是否有其他線程完成任務來接手這個請求,這樣就可以避免因創建新線程而造成的消耗。如果這段時間內沒有線程完成任務,就創建一個新線程去執行新任務。

  4.當任務數量超過下限後,每排入一個新任務,就會增加一個新線程,這段期間,任務和線程數量都持續增加,直至線程數量達到上線值為止。

  5.當線程數量達到上限時,繼續增加任務,線程數量將不再增加,比如你向線程池中排入100個任務,則只有25個進入線程池(和上限相同),另外75個在線程池外排隊等待。當線程池中的某個線程完成任務後,並不會被終止,而是從等待隊列中選擇一個任務繼續執行,這樣就減少了因創建和銷毀線程而消耗的時間。

  6.當排入所有的任務後,隨著線程池內的任務被逐步完成,線程池外部等待的任務被逐步調入線程池,任務的數量逐步減少,但線程的數量保持恒定,始終和上限值相同。

  7.隨著任務被逐步完成,總有某一刻,任務數量會小於上限值,這時線程池內多余的線程會在空閑2min後被釋放並回收相關資源。線程數目逐步減小,直到達到下限值為止。

  8.當任務數量減小到下限值之下時,線程池中的線程數目保持不變(始終和下限值相同),其中一部分在執行任務,另一部分處於空運行狀態。

  9.當所有任務都完成後,線程池恢復初始狀態,運行5個空線程。

由上面的論述可以看出線程池提高效率的關鍵是一個線程完成任務後可以繼續為其他任務服務,這樣就可以使用有限的幾個固定線程輪流為大量的任務服務,從而減少了因頻繁創建和銷毀線程鎖造成的消耗。

安裝庫:pip install threadpool

pool=ThreadPool(poolsize) #定義了一個線程池,最多可以創建poolsize這麽多的線程

requests=makeRequests(some_callable,list_of_args,callback) #要開啟多線程的函數,參數,回調函數(可不寫)

[pool.putRequests(req) for req in requests] #將所有要運行多線程的請求扔進線程池

pool.wait() #等待所有的線程完成工作後退出

makeRequests(callable,args_list,callback=None,exc_callback=None)
創建多個計算請求,並允許有不同的參數
參數列表中的每一個元素是兩個元素的元祖,分別是位置參數列表和關鍵字參數字典
class ThreadPool
線程池類,發布工作請求並收集結果
__init__(self,num_workers,q_size)
構造函數,設置線程池工作線程數量和最大任務隊列長度,num_workers是初始化時的線程數量。如果q_size>0則會限制工作隊列的長度,並且在工作隊列滿時阻塞繼續插入工作請求的任務。
createWorkers(self,num_workers)
增加工作線程數量
dismissWorkers(self,num_workers)
減少工作線程數量
pool(self,block)
處理隊列中的新結果。也就是循環的調用各個線程結果中的回調和錯誤回調。不過,當請求隊列為空時會拋出NoResultPending異常,以表示所有的結果都處理完了,這個特點對於依賴線程執行結果繼續加入請求隊列的方式不太適合。
putRequest(self,request,block=True,timeout=0)
加入一個任務請求到工作隊列
wait(self)
等待執行結果,知道所有任務完成。
class WorkerThread
工作者線程,供ThreadPool內部使用,不必關註,其自定義方法也只有一個
class WorkRequest
任務請求類
__init__(self,callable,args=None,kwds=None,requestID=None,callback=None,exc_callback=None)
創建一個工作請求

-----------------------------------------------------------

例子:

import time
def sayhello(str):
print "Hello ",str
time.sleep(2)

name_list =[‘xiaozi‘,‘aa‘,‘bb‘,‘cc‘]
start_time = time.time()
for i in range(len(name_list)):
sayhello(name_list[i])
print ‘%d second‘% (time.time()-start_time)

輸出時間為8 second

-----------------------------------------------------------------

import time
import threadpool
def sayhello(str):
print "Hello ",str
time.sleep(2)

name_list =[‘xiaozi‘,‘aa‘,‘bb‘,‘cc‘]
start_time = time.time()
pool = threadpool.ThreadPool(10)
requests = threadpool.makeRequests(sayhello, name_list)
[pool.putRequest(req) for req in requests]
pool.wait()
print ‘%d second‘% (time.time()-start_time)

輸出時間為2 second

threadpool和Queue