1. 程式人生 > >執行緒,生產者消費者模型

執行緒,生產者消費者模型

什麼是執行緒:
1.執行緒是cpu最小的執行單位
2.程序是資源單位
3.如果將作業系統比作工廠的話,程序是車間,執行緒是流水線,而cpu為電源
開啟一個程序預設有一個執行緒即主執行緒

多執行緒的好處:①當遇到IO操作時,可以將主程序分為多個執行緒,cpu遇到IO可能會切換到該程序的其他執行緒,cpu線上程間切換,
故擁有就緒態多的執行緒的程序被執行的概率變高,提升了程序的效率

執行緒的兩種開啟方式:①例項化Thread類②自定義MYthread類,繼承Thread類,覆蓋run方法

什麼時候需要開啟多執行緒,什麼時候需要開啟多程序:
①當我們只有1個cpu即單核:
程序對於作業系統的資源耗費非常高,而執行緒相反會場低(比程序低10-100倍),只有一個cpu的話,開啟多執行緒要比多程序效率更高
②當我們有多個cpu
1.當我們的多程序都是IO時間較長時,開啟多程序和多執行緒沒有差別,因為CPU遇到IO都會切換,IO等待較長時,多程序和多執行緒都可以在等待時間內將其他非IO程式完成
2.當我們的多程序都cpu密集型,即運算多時,多程序要比多執行緒運算的快

同一個程序的多個執行緒是共享資源的
也就是說當我們多個執行緒對共有的資源修改時會導致資料的錯亂,故需要執行緒互斥鎖來保證資料安全,使用方法與程序鎖一樣

守護執行緒:
因為執行緒是共享資料資源的,為了保證資料安全,主執行緒會在所有子執行緒執行完畢後結束,故主執行緒的守護執行緒會在所有非守護執行緒執行完畢後結束

訊號量:
semaphore
互斥鎖是一個種特殊的訊號量,即數量限制為1,
訊號量同一時刻可以允許規定數量的程式同時共享資料

生產者與消費者模型問題:
消費者不知道生產者何時生產完畢,一直在取容器中的產品消費,當生產者不再生產時,消費者會一直處於阻塞狀態
解決方法
①生產者在容器中放入None標記,當消費者獲取到None標記時,則停止從容器中取產品
併發的起動,但生產者要優先生產完畢,並根據消費者數量放入等額的None即停止標記

from threading import Thread
import time,random
from multiprocessing import Queue
q = Queue()

def producter(name):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res = "%s的第%s個包子"%(name,i+1)
        print("%s生產了第%s個包子"%(name,i+1))
        q.put(res)

def customer(name):
    while True:
        time.sleep(random.randint(1,2))
        res = q.get()
        if not res :break
        print("%s正在吃%s"%(name,res))
if __name__ == '__main__':
    p1 = Thread(target=producter,args = ("zb",))
    p2 = Thread(target=producter,args = ("xzh",))
    p3 = Thread(target=producter,args = ("lqq",))
    c1 = Thread(target=customer,args = ("egon",))
    c2 = Thread(target=customer,args = ("alex",))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)

②使用可join的佇列即JoinableQueue
同理:讓生產者優先順序提高,先告知q.join()總產品的數量,當消費者獲取時產品,q.task_done(),給q.join()發一個通知,
當數量吻合時,q.join()執行完畢,則主執行緒執行完畢,將消費者設定為主執行緒的守護執行緒,則消費者同時掛掉,不會再去獲取

from threading import Thread
import time,random
from multiprocessing import JoinableQueue
q = JoinableQueue()

def producter(name):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res = "%s的第%s個包子"%(name,i+1)
        print("%s生產了第%s個包子"%(name,i+1))
        q.put(res)

def customer(name):
    while True:
        time.sleep(random.randint(1,2))
        res = q.get()
        if not res :break
        print("%s正在吃%s"%(name,res))
        q.task_done()
if __name__ == '__main__':
    p1 = Thread(target=producter,args = ("zb",))
    p2 = Thread(target=producter,args = ("xzh",))
    p3 = Thread(target=producter,args = ("lqq",))
    c1 = Thread(target=customer,args = ("egon",))
    c2 = Thread(target=customer,args = ("alex",))
    p1.start()
    p2.start()
    p3.start()
    c1.daemon =True
    c1.start()
    c2.daemon = True
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    q.join()