Python中的多執行緒程式設計,執行緒安全與鎖(二) Python中的多執行緒程式設計,執行緒安全與鎖(一)
在我的上篇博文Python中的多執行緒程式設計,執行緒安全與鎖(一)中,我們熟悉了多執行緒程式設計與執行緒安全相關重要概念, Threading.Lock實現互斥鎖的簡單示例,兩種死鎖(迭代死鎖和互相等待死鎖)情況及處理。今天我們將聚焦於Python的Threading模組總結和執行緒同步問題。
1. Threading模組總結
1.1 Threading模組概覽
threading用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。python當前版本的多執行緒庫沒有實現優先順序、執行緒組,執行緒也不能被停止、暫停、恢復、中斷。
threading模組提供的類:
threading 模組提供的常用方法:
threading.currentThread(): 返回當前的執行緒變數。
threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
threading 模組提供的常量:
threading.TIMEOUT_MAX 設定threading全域性超時時間。
1.2 Thread類
Thread是執行緒類,有兩種使用方法,直接傳入要執行的方法或從Thread繼承並覆蓋run()。推薦使用方法一,將目標函式作為target引數傳入,非常簡單實用。
# coding:utf-8 import threading import time #方法一:將要執行的方法作為引數傳給Thread的構造方法 def action(arg): time.sleep(1) print 'the arg is:%s\r' %arg for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main thread end!' #方法二:從Thread繼承,並重寫run() class MyThread(threading.Thread): def __init__(self,arg): super(MyThread, self).__init__()#注意:一定要顯式的呼叫父類的初始化函式。 self.arg=arg def run(self):#定義每個執行緒要執行的函式 time.sleep(1) print 'the arg is:%s\r' % self.arg for i in xrange(4): t =MyThread(i) t.start() print 'main thread end!'
相關方法:
構造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 執行緒名;
args/kwargs: 要傳入方法的引數。
例項方法:
isAlive(): 返回執行緒是否在執行。正在執行指啟動後、終止前。
get/setName(name): 獲取/設定執行緒名。
start(): 執行緒準備就緒,等待CPU排程
is/setDaemon(bool): 將該子執行緒設定為父執行緒的守護執行緒(預設為非守護執行緒(False))。(需要線上程start之前設定)
關於“守護”的含義,我們可以這樣理解,子執行緒為父執行緒的守護執行緒,意思是說子執行緒要守著父執行緒,一旦父執行緒執行完畢,也就不需要“守護”了,所以此時子執行緒就要結束。
True: 設定該子程序為父程序的守護程序,即後臺執行緒。主執行緒執行過程中,子執行緒也在進行,主執行緒執行完畢後,子執行緒不論成功與否,主執行緒和子執行緒均停止。
False:設定該子程序為父程序的非守護程序,即前臺程序。主執行緒執行過程中,子執行緒也在進行,主執行緒程式碼執行完畢後,仍需要等待子執行緒也執行完成後,主執行緒才會停止。
start(): 啟動執行緒。
join([timeout]): 阻塞當前上下文環境的執行緒,直到呼叫此方法的執行緒終止或到達指定的timeout(可選引數)。
1.2.1 關鍵引數setDaemon
該引數是設定執行緒屬性,規定當前執行緒是否屬於守護執行緒(預設為非守護執行緒(False))。(需要線上程start之前設定)
- True: 設定該子程序為父程序的守護程序,即後臺執行緒。主執行緒執行過程中,子執行緒也在進行,主執行緒執行完畢後,子執行緒不論成功與否,主執行緒和子執行緒均停止。
- False:設定該子程序為父程序的非守護程序,即前臺程序。主執行緒執行過程中,子執行緒也在進行,主執行緒程式碼執行完畢後,仍需要等待子執行緒也執行完成後,主執行緒才會停止。
關於setDaemon,預設子執行緒屬於非守護執行緒,即主執行緒要等待所有子執行緒執行完之後,才停止程式。
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.start() print 'main_thread end!' main_thread end! sub thread start!the thread name is:Thread-2 the arg is:1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:2 the arg is:3 Process finished with exit code 0 可以看出,建立的4個“前臺”執行緒,主執行緒執行過程中,前臺執行緒也在進行,主執行緒執行完畢後,等待前臺執行緒也執行完成後,程式停止
該例子驗證了setDeamon(False)(預設)非守護執行緒,主執行緒執行過程中,子執行緒也在進行,主執行緒執行完畢後,等待子執行緒也執行完成後,主執行緒停止。
設定setDeamon=True時:
# coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName() print 'the arg is:%s\r' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True)#設定執行緒為後臺執行緒 t.start() print 'main_thread end!' main_thread end! 可以看出,主執行緒執行完畢後,後臺執行緒不管是成功與否,主執行緒均停止
驗證了setDeamon(True)守護執行緒,主執行緒執行過程中,守護執行緒也在進行,主執行緒執行完畢後,子執行緒不論成功與否,均與主執行緒一起停止。
1.2.2 關鍵引數join
阻塞當前上下文環境的執行緒,直到呼叫此方法的執行緒終止或到達指定的timeout(可選引數)。即當子程序的join()函式被呼叫時,主執行緒就被阻塞住了,意思為不再繼續往下執行。
值得注意的是,由於join()會阻塞其他函式,如果我們要用for迴圈觸發多個執行緒的執行,start()要和join()分開(用兩個for迴圈,先用第一個for迴圈將全部子執行緒start(),再用第二個for迴圈將全部子執行緒join),不然會讓多執行緒並行執行,變成多執行緒依次執行:
- 因為如果其他執行緒還沒有start(),那麼由於start()操作屬於主執行緒的呼叫,那麼start()會被阻塞,我們原本想要的多執行緒並行執行會變成多執行緒依次執行。
- 如果此時其他執行緒已經start()了,那麼join()函式由於是對子執行緒的操作,不屬於主執行緒,則不會被阻塞。
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) thread_list = [] #執行緒存放列表 for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) thread_list.append(t) for t in thread_list: t.start() for t in thread_list: t.join()
print("main_thread end!")
#Output: sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 設定join之後,主執行緒等待子執行緒全部執行完成後或者子執行緒超時後,主執行緒才會從被阻塞的地方繼續執行。
驗證了 join()阻塞當前上下文環境的執行緒,直到呼叫此方法的執行緒終止或到達指定的timeout,即使設定了setDeamon(True)主執行緒依然要等待子執行緒結束。
使用例子(join不妥當的用法,使多執行緒程式設計順序執行)
#coding:utf-8 import threading import time def action(arg): time.sleep(1) print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName() print 'the arg is:%s ' %arg time.sleep(1) for i in xrange(4): t =threading.Thread(target=action,args=(i,)) t.setDaemon(True) t.start() t.join() print 'main_thread end!' join不妥當用法 sub thread start!the thread name is:Thread-1 the arg is:0 sub thread start!the thread name is:Thread-2 the arg is:1 sub thread start!the thread name is:Thread-3 the arg is:2 sub thread start!the thread name is:Thread-4 the arg is:3 main_thread end! Process finished with exit code 0 可以看出此時,程式只能順序執行,每個執行緒都被上一個執行緒的join阻塞,使得“多執行緒”失去了多執行緒意義。 執行結果
1.3 Timer類
Timer(定時器)是Thread的派生類,用於在指定時間後呼叫一個方法。
構造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的時間
function: 要執行的方法
args/kwargs: 方法的引數
例項方法:
Timer從Thread派生,沒有增加例項方法。
# encoding: UTF-8
import threading
def func():
print (hello timer!)
if __name__ == "__main__"
timer = threading.Timer(5, func)
timer.start()
該例子效果為延遲5秒執行
2 執行緒同步
執行緒同步是藉由threading模組的以下類實現的:Condition,Event,Local.
2.1 Condition類
我們先關注一下Condition類一般用於什麼場景:
執行緒A需要等某個條件成立才能繼續往下執行,現在這個條件不成立,執行緒A就阻塞等待,而執行緒B在執行過程中使這個條件成立了,就喚醒執行緒A繼續執行。在pthread庫中通過條件變數(Condition Variable)來阻塞等待一個條件,或者喚醒等待這個條件的執行緒。
通俗的講,Condition類適合於生產者,消費者模型。 即Condition很適合那種主動休眠,被動喚醒的場景。 Condition使用難度要高於mutex,一不注意就會被死鎖,所以很考驗對condition的理解。
首先我們知道python下的執行緒是真實的執行緒,底層用的是pthread。pthread內部Condition條件變數有兩個關鍵函式, await和signal方法,對應python threading Condition是wait和notify方法。
一個 Condition例項的內部實際上維護了兩個佇列, 一個是等待鎖佇列(實際上mutex內部其實就是維護這個等待鎖佇列) , 另一個佇列可以叫等待條件佇列,在這佇列中的節點都是由於(某些條件不滿足而)執行緒自身呼叫wait方法阻塞的執行緒(記住是自身阻塞)。最重要的Condition方法是wait和 notify方法。另外condition還需要lock的支援, 如果你建構函式沒有指定lock,condition會預設給你配一個rlock。構造方法:
Condition([lock/rlock])
例項方法:
acquire([timeout])/release(): 呼叫關聯的鎖的相應方法。
wait([timeout]): 呼叫這個方法將使執行緒進入Condition的等待池等待通知,並釋放鎖。使用前執行緒必須已獲得鎖定,否則將丟擲異常。
notify(): 呼叫這個方法將從等待池挑選一個執行緒並通知,收到通知的執行緒將自動呼叫acquire()嘗試獲得鎖定(進入鎖定池);其他執行緒仍然在等待池中。呼叫這個方法不會釋放鎖定。使用前執行緒必須已獲得鎖定,否則將丟擲異常。
notifyAll(): 呼叫這個方法將通知等待池中所有的執行緒,這些執行緒都將進入鎖定池嘗試獲得鎖定。呼叫這個方法不會釋放鎖定。使用前執行緒必須已獲得鎖定,否則將丟擲異常。
notify(signal)方法:
1. 將條件佇列的隊首節點取出,放入等待鎖佇列的隊尾 2. 喚醒節點對應的執行緒.注: notify ( signal ) 可以把wait佇列的那些執行緒給喚醒起來。
下面給一個生產者-消費者模型的例子:
#/usr/bin/python3 # encoding: UTF-8 import threading import time # 商品 product = None # 條件變數 con = threading.Condition() # 生產者方法 def produce(): global product if con.acquire(): while True: if product is None: print("produce the product...") product = 'anything' # 通知消費者,商品已經生產 con.notify() else: print("Producer: product is not None") # 等待通知 con.wait() print("Producer: Resume from wait...") time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print("consume the product...") product = None # 通知生產者,商品已經沒了 con.notify() else: print("Cosumer: product is None") # 等待通知 con.wait() print("Cosumer: Resume from wait...") time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
結果為
Cosumer: product is None
produce the product Cosumer: Resume from wait...
cosume the product
Producuer: Resume from wait...
produce the product
Cosumer: Resume from wait...
cosume the product
注意:con.wait()一定要在if判斷的外面,因為一開始的時候,兩個子執行緒都獲得了鎖。如果沒有con.wait()釋放鎖並等待通知,則另一個暫時沒有獲得鎖許可權的執行緒,會一直被阻塞。整個生產者-消費者模型也就跑不起來了。下面是跑步起來的例子:
#/usr/bin/python3 # encoding: UTF-8 import threading import time # 商品 product = None # 條件變數 con = threading.Condition() # 生產者方法 def produce(): global product if con.acquire(): while True: if product is None: print("produce the product...") product = 'anything' # 通知消費者,商品已經生產 con.notify() con.wait() print("Producer: Resume from wait...") time.sleep(2) else: print("Producer: product is not None") # 等待通知 #con.wait() #print("Producer: Resume from wait...") # time.sleep(2) # 消費者方法 def consume(): global product if con.acquire(): while True: if product is not None: print("consume the product...") product = None # 通知生產者,商品已經沒了 con.notify() con.wait() print("Cosumer: Resume from wait...") time.sleep(2) else: print("Cosumer: product is None") # 等待通知 #con.wait() # print("Cosumer: Resume from wait...") # time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=produce) t2 = threading.Thread(target=consume) t2.start() t1.start()
結果為:
Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None Cosumer: Product is None
2.2 Event類
Event(事件)是最簡單的執行緒通訊機制之一:一個執行緒通知事件,其他執行緒等待事件。Event內建了一個初始為False的標誌,當呼叫set()時設為True,呼叫clear()時重置為 False。wait()將阻塞執行緒至等待阻塞狀態。
Event其實就是一個簡化版的 Condition。Event沒有鎖,無法使執行緒進入同步阻塞狀態。
構造方法:
Event()
例項方法:
isSet(): 當內建標誌為True時返回True。
set(): 將標誌設為True,並通知所有處於等待阻塞狀態的執行緒恢復執行狀態。
clear(): 將標誌設為False。
wait([timeout]): 如果標誌為True將立即返回,否則阻塞執行緒至等待阻塞狀態,等待其他執行緒呼叫set()。
下面是例子:
# encoding: UTF-8 import threading import time event = threading.Event() def func(): # 等待事件,進入等待阻塞狀態 print '%s wait for event...' % threading.currentThread().getName() event.wait() # 收到事件後進入執行狀態 print '%s recv event.' % threading.currentThread().getName()
if __name__ == "__main__":
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
time.sleep(2)
print 'MainThread set event.'
event.set()
結果為
Thread-1 wait for event... Thread-2 wait for event... #2秒後。。。 MainThread set event. Thread-1 recv event. Thread-2 recv event.
2.3 Local類
local是一個小寫字母開頭的類,用於管理 thread-local(執行緒區域性的)資料。對於同一個local,執行緒無法訪問其他執行緒設定的屬性;執行緒設定的屬性不會被其他執行緒設定的同名屬性替換。
可以把local看成是一個“執行緒-屬性字典”的字典,local封裝了從自身使用執行緒作為 key檢索對應的屬性字典、再使用屬性名作為key檢索屬性值的細節。
# encoding: UTF-8 import threading local = threading.local() local.tname = 'main' def func(): local.tname = 'notmain' print local.tname if __name__ == "__main__": t1 = threading.Thread(target=func) t1.start() t1.join() print(local.tname)