Python---執行緒的鎖
1、同步鎖
為了防止讀取到髒資料,對臨界資源進行加鎖,將並行被迫改為序列。通過threading.Lock()方法建立一把鎖。
acquire() 方法:只有一個執行緒能成功的獲取鎖,按先後順序 其他執行緒只能等待。release() 方法:執行緒釋放。這把鎖不允許在同一執行緒中被多次acquire()。
import threading import time def check(): global number lock.acquire() # 對臨界資源進行加鎖 temp = number time.sleep(0.01) number = temp-1 lock.release() l = [] number = 100 # 例項一把鎖 lock = threading.Lock() for i in range(100): t = threading.Thread(target=check) t.start() l.append(t) for t in l: t.join() print(number)
2、遞迴鎖(解決死鎖問題)
遞迴鎖: RLock允許在同一執行緒中被多次acquire() 加鎖,其內部存在著一個計數器,只要計數器大於0就沒人能拿到這把鎖。
acquire()方法和release()方法必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的鎖。
通過 rlock = threading.Rlock() 方法獲得一把遞迴鎖。
死鎖:當一個或多個程序等待系統資源,而資源又被程序本身或其它程序佔用時,就形成了死鎖。
import time import threading class MyThread(threading.Thread): def run(self): self.actionA() self.actionB() def actionA(self): LockA.acquire() print("got Alock"+str(threading.current_thread())) time.sleep(1) LockB.acquire() print("got Block" + str(threading.current_thread())) time.sleep(0.) LockB.release() LockA.release() def actionB(self): LockB.acquire() print("got Block"+str(threading.current_thread())) time.sleep(1) LockA.acquire() print("got Alock" + str(threading.current_thread())) time.sleep(0.1) LockA.release() LockB.release() if __name__ == '__main__': LockA = threading.Lock() LockB = threading.Lock() L = [] for i in range(5): t = MyThread() t.start() L.append(t) for t in L: t.join() print('ending')
當一個執行緒執行完actionA方法以後,會繼續執行actionB方法,此時並不會產生死鎖。當第二個執行緒開始執行的時候,可以獲得A鎖,當獲取B鎖的時候,返現B鎖已經被第一個執行緒佔用,而第一個執行緒也在請求A鎖。兩個執行緒僵持不下所以發生死鎖。當加上遞迴鎖以後就解決此問題。
加上遞迴鎖:
import time import threading class MyThread(threading.Thread): def run(self): self.actionA() self.actionB() def actionA(self): LockA.acquire() print("got Alock"+str(threading.current_thread())) time.sleep(1) LockA.acquire() print("got Block" + str(threading.current_thread())) time.sleep(0.) LockA.release() LockA.release() def actionB(self): LockA.acquire() print("got Block"+str(threading.current_thread())) time.sleep(1) LockA.acquire() print("got Alock" + str(threading.current_thread())) time.sleep(0.1) LockA.release() LockA.release() if __name__ == '__main__': # 獲得一把鎖 LockA = threading.RLock() L = [] # 例項執行緒物件 for i in range(5): t = MyThread() t.start() L.append(t) for t in L: t.join() print('ending')
當第一個執行緒執行完actionA方法以後再執行actionB方法會再次請求一把相同的鎖,和其他執行緒一同競爭獲得該鎖。從而解決了此問題。
3、訊號量
保證兩個或多個關鍵程式碼段不被併發呼叫,也是一種鎖。threading.Semaphore(size) 建立一個訊號量。也有acquire()、release()方法和上面相同。
import threading
import time
def check():
# 當進來一個變數,就會減一,一共有5把鎖
if semaphore.acquire():
print(threading.current_thread())
time.sleep(3)
# 每釋放一把鎖就會加一
semaphore.release()
# 最後顯示會每三秒出5個結果
if __name__ == '__main__':
L = []
# 建立5個訊號量,訊號量也是一種鎖
semaphore = threading.Semaphore(5)
for i in range(100):
t = threading.Thread(target=check)
t.start()
L.append(t)
for t in L:
t.join()
print('end')
4、同步物件Event
threading.event()建立一個同步物件,類似於訊號量機制,只不過訊號量為一。
wait()方法:沒有設定標誌位就會卡住,一旦被設定等同於pass。set()方法:設定標誌位。clear()方法:清楚標誌位。is_set()方法:是否設定。
import time
import threading
class Boss(threading.Thread):
def run(self):
print("加班加班")
# 檢驗是否設定標誌位
print(event.is_set())
# 設定標誌位
event.set()
time.sleep(3)
# 清空標誌位
event.clear()
print('加班結束')
event.set()
class Worker(threading.Thread):
def run(self):
# 等待設定標誌位,如果設定,此語句相當於pass
event.wait()
print("命苦啊")
time.sleep(1)
event.clear()
event.wait()
print("OH yeah")
if __name__ == '__main__':
L = []
# 建立一個同步物件
event = threading.Event()
for i in range(5):
L.append(Worker())
L.append(Boss())
for t in L:
t.start()
for t in L:
t.join()
print("end")
5、生產者消費者模型
(1):佇列:執行緒佇列一種資料結構,是多執行緒的、安全的。列表是執行緒不安全的。
佇列的模式:1、先進先出 2、先進後出 3、按優先順序。
import queue
# 建立一個佇列先進先出
q = queue.Queue(3)
# q = queue.LifoQueue() 建立先進後出佇列。(棧)later in first out
# q = queue.PriorityQueue() 建立一個按優先順序順序的佇列
# 新增資料 當超過佇列大小時,就會卡住put。後面如果加上false引數就會在滿的時候報錯。
q.put(1)
q.put("csdn")
q.put({"name":"2333"})
q.put("baba")
# print(q.qsize()) 不是最大值,而是當前佇列有多少個元素。
# print(q.maxsize) 佇列的最長度
# print(q.full()) 隊滿
# print(q.empty()) 隊空
while 1:
print("+++++++++")
# 取資料,沒資料的時候會一直卡住,一直等到有新增值。(另外的一個執行緒可以繼續新增值)。後面如果加上false引數就會在滿的時候報錯
print(q.get())
print("---------")
當前程式會卡在put()方法。
當使用優先順序佇列時在put()資料時一同寫上優先順序,數值越小優先順序越高。
q.put([1,1])
(2)Producer-consumer problem:
task_done()方法:完成一項工作後向另一個執行緒發訊號。join()方法:等到佇列為空時,在執行別的操作。成對出現,否則沒意義。
import time, threading, queue, random
def producer(name):
count = 0
while count < 10:
print("making....")
# 隨機數範圍1--2
time.sleep(random.randrange(3))
q.put(count)
print("{}做好了第{}個包子".format(name,count))
count += 1
# q.task_done()
q.join()
def consumer(name):
count = 0
while count < 10:
time.sleep(random.randrange(4))
print('waiting')
# q.join() 沒有訊號會卡住,有人給訊號就往下執行
data = q.get()
q.task_done()
print("{}正在吃第{}個包子".format(name,data))
count += 1
if __name__ == '__main__':
q = queue.Queue()
t1 = threading.Thread(target=producer,args=('A君',))
t2 = threading.Thread(target=consumer,args=('B君',))
t3 = threading.Thread(target=consumer,args=('C君',))
t4 = threading.Thread(target=consumer,args=('D君',))
t1.start()
t2.start()
t3.start()
t4.start()
task_done : 用於 get 的後續呼叫.告訴佇列任務處理完成.當然你也可以不呼叫get。
join:阻塞操作,直到佇列所有的任務都處理,換句話說,當佇列裡面沒有東西的時候才會向下執行,否則會阻塞。也就是說:往裡 put 幾次,就要呼叫task_done幾次。然而task_done()並不會使佇列長度-1,而是會向佇列傳送訊號,真正使佇列長度減一的是get()操作。
put佇列完成的時候千萬不能用task_done(),否則會報錯,因為該方法僅僅表示get成功後,執行的一個標記。