執行緒,生產者消費者模型
什麼是執行緒:
1.執行緒是cpu最小的執行單位
2.程序是資源單位
3.如果將作業系統比作工廠的話,程序是車間,執行緒是流水線,而cpu為電源
開啟一個程序預設有一個執行緒即主執行緒
多執行緒的好處:①當遇到IO操作時,可以將主程序分為多個執行緒,cpu遇到IO可能會切換到該程序的其他執行緒,cpu線上程間切換,
故擁有就緒態多的執行緒的程序被執行的概率變高,提升了程序的效率
執行緒的兩種開啟方式:①例項化Thread類②自定義MYthread類,繼承Thread類,覆蓋run方法
什麼時候需要開啟多執行緒,什麼時候需要開啟多程序:
①當我們只有1個cpu即單核:
程序對於作業系統的資源耗費非常高,而執行緒相反會場低(比程序低10-100倍),只有一個cpu的話,開啟多執行緒要比多程序效率更高
②當我們有多個cpu
1.當我們的多程序都是IO時間較長時,開啟多程序和多執行緒沒有差別,因為CPU遇到IO都會切換,IO等待較長時,多程序和多執行緒都可以在等待時間內將其他非IO程式完成
2.當我們的多程序都cpu密集型,即運算多時,多程序要比多執行緒運算的快
同一個程序的多個執行緒是共享資源的
也就是說當我們多個執行緒對共有的資源修改時會導致資料的錯亂,故需要執行緒互斥鎖來保證資料安全,使用方法與程序鎖一樣
守護執行緒:
因為執行緒是共享資料資源的,為了保證資料安全,主執行緒會在所有子執行緒執行完畢後結束,故主執行緒的守護執行緒會在所有非守護執行緒執行完畢後結束
訊號量:
semaphore
互斥鎖是一個種特殊的訊號量,即數量限制為1,
訊號量同一時刻可以允許規定數量的程式同時共享資料
生產者與消費者模型問題:
消費者不知道生產者何時生產完畢,一直在取容器中的產品消費,當生產者不再生產時,消費者會一直處於阻塞狀態
解決方法
①生產者在容器中放入None標記,當消費者獲取到None標記時,則停止從容器中取產品
併發的起動,但生產者要優先生產完畢,並根據消費者數量放入等額的None即停止標記
from threading import Thread import time,random from multiprocessing import Queue q = Queue() def producter(name): for i in range(5): time.sleep(random.randint(1,2)) res = "%s的第%s個包子"%(name,i+1) print("%s生產了第%s個包子"%(name,i+1)) q.put(res) def customer(name): while True: time.sleep(random.randint(1,2)) res = q.get() if not res :break print("%s正在吃%s"%(name,res)) if __name__ == '__main__': p1 = Thread(target=producter,args = ("zb",)) p2 = Thread(target=producter,args = ("xzh",)) p3 = Thread(target=producter,args = ("lqq",)) c1 = Thread(target=customer,args = ("egon",)) c2 = Thread(target=customer,args = ("alex",)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None)
②使用可join的佇列即JoinableQueue
同理:讓生產者優先順序提高,先告知q.join()總產品的數量,當消費者獲取時產品,q.task_done(),給q.join()發一個通知,
當數量吻合時,q.join()執行完畢,則主執行緒執行完畢,將消費者設定為主執行緒的守護執行緒,則消費者同時掛掉,不會再去獲取
from threading import Thread
import time,random
from multiprocessing import JoinableQueue
q = JoinableQueue()
def producter(name):
for i in range(5):
time.sleep(random.randint(1,2))
res = "%s的第%s個包子"%(name,i+1)
print("%s生產了第%s個包子"%(name,i+1))
q.put(res)
def customer(name):
while True:
time.sleep(random.randint(1,2))
res = q.get()
if not res :break
print("%s正在吃%s"%(name,res))
q.task_done()
if __name__ == '__main__':
p1 = Thread(target=producter,args = ("zb",))
p2 = Thread(target=producter,args = ("xzh",))
p3 = Thread(target=producter,args = ("lqq",))
c1 = Thread(target=customer,args = ("egon",))
c2 = Thread(target=customer,args = ("alex",))
p1.start()
p2.start()
p3.start()
c1.daemon =True
c1.start()
c2.daemon = True
c2.start()
p1.join()
p2.join()
p3.join()
q.join()