1. 程式人生 > 實用技巧 >併發程式設計筆記(2)——訊號量、事件、佇列(程序間的通訊)

併發程式設計筆記(2)——訊號量、事件、佇列(程序間的通訊)

內容目錄

  • 訊號量
  • 事件
  • 佇列

內容詳細

訊號量(重點)

  • 可以規定有多少程序使用關鍵程式碼,其餘程序阻塞,直到有子程序釋放

  • 示例:模擬KTV使用,同時只有4個人使用

    import random
    import time
    from multiprocessing import Process
    from multiprocessing import Semaphore  #使用訊號量模組
    
    def ktv(i,sem):
        sem.acquire()       #獲取鑰匙,只有4個程序可以執行,後續阻塞
        print('%s走進KTV'%i)
        time.sleep(random.randint(5,10))    #模擬程序使用的時間(隨機秒數)
        print('%s走出KTV'%i)
        sem.release()       #程序結束後還鑰匙
    
    if __name__ == '__main__':
        sem = Semaphore(4)      #例項化訊號量,規定多少程序可以使用
        for i in range(20):
            p = Process(target=ktv,args=(i,sem))
            p.start()
    

事件(重點)

  • 通過一個訊號來控制多個程序同時執行或阻塞

  • 一個事件被建立之後,預設是阻塞狀態

    from multiprocessing import Event   #使用事件模組
    
    # 一個訊號可以使所有的程序都進入阻塞狀態
    # 也可以控制所有的程序解除阻塞
    # 一個事件被建立之後,預設是阻塞狀態
    
    e = Event()     #建立一個事件
    print(e.is_set())   #檢視一個事件的狀態,預設被設定成阻塞
    e.set()         #將這個事件的狀態改為True
    print(e.is_set())
    e.wait()        #依據e.is_set()的值來決定是否阻塞的
    print(123456)
    
    e.clear()       #將這個事件的狀態改為False
    print(e.is_set())
    e.wait()        #此時程式為阻塞狀態,等待訊號變為True
    print('*'*10)
    
  • set 和 clear

    • 分別用來修改一個事件的狀態,True或者False
  • is_set用來檢視一個事件的狀態

  • wait 是依據事件的狀態來決定自己是否阻塞

    • is_set()狀態為False是阻塞,True是不阻塞
  • 事件示例:經典的紅綠燈事件

    # 綠燈來了車通過,紅燈車為阻塞狀態
    import time
    import random
    from multiprocessing import Event,Process
    
    def cars(e,i):
        if not e.is_set():
            print('car%i在等待'%i)
            e.wait()            # 阻塞 直到得到一個事件狀態變為True的訊號
        print('\033[0;32;40mcar%s通過\033[0m'%i)
    
    def light(e):
        while True:
            if e.is_set():
                e.clear()
                print('\033[31m紅燈亮了\033[0m')
            else:
                e.set()
                print('\033[32m綠燈亮了\033[0m')
            time.sleep(2)
    
    if __name__ == '__main__':
        e = Event()
        traffic = Process(target=light,args=(e,))
        traffic.start()
        for i in range(20):
            car = Process(target=cars,args=(e,i))
            car.start()
            time.sleep(random.randint(1,5))
    

佇列 --- 程序間的通訊

  • 程序間通訊-IPC(使用Queue模組)

    import time
    from multiprocessing import Queue   #使用佇列模組
    q = Queue(5)        #設定佇列中只能有5個程序或資料
    q.put(1)        #往佇列中放入1
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)        #此時佇列已經滿了,如果再往裡放則為阻塞狀態
    print(q.full())     # 查詢佇列是否滿了(True為滿了,False為不滿)
    
    print(q.get())  #從佇列中取出
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())  #此時佇列已經空了,如果繼續取出,則為阻塞狀態
    print(q.empty())    # 查詢佇列是否為空(True為空,False為未空)
    
    q.get_nowait()  #強制取值,此時會報queue.Empty錯誤,表示佇列已經空
    
    # while True:
    #     try:
    #         q.get_nowait()
    #     except:
    #         print('佇列已空')
    #         time.sleep(0.5)
    #         while迴圈1秒會迴圈上千次,損耗記憶體,此時加上睡眠時間以避免記憶體過度損耗
    
  • 注意:

    • q.empty()為檢查佇列是否為空,有不可靠因素。此方法是實時檢測佇列是否為空,如果此時生產者有往佇列中正在新增的程序時,佇列此時為空
  • 簡單的多程序佇列模型

    from multiprocessing import Queue,Process
    def produce(q):          #生產資料
        q.put('hello')
    
    def consume(q):          #取出資料
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()     #此隊列表示沒有限制
        p = Process(target=produce,args=(q,))
        p.start()
        c = Process(target=consume,args=(q,))
        c.start()
    
  • 經典的生產者消費者模型

    • 為解決供需不平衡的問題
    #plan 1:不完整,存在BUG
    import time
    import random
    from multiprocessing import Queue,Process
    
    def consumer(q,name):
        while True:
            food = q.get()
            if food is None:
                #如果獲取到None則終止迴圈,問題bug:此時如果多個消費者取佇列中的None,
                #只能是第一個程序能取到,剩餘程序為阻塞狀態。需要往佇列中新增與消費者數量相匹配的None。
                print('%s獲取到一個空'%name)
                break
            print( '\033[31m%s消費了%s\033[0m'%(name,food))
            time.sleep(random.randint(1,3))
    
    def producer(name,food,q):      # 建立生產者
        for i in range(4):
            time.sleep(random.randint(1,3))
            foods = '%s生產了%s個%s'%(name,i+1,food)
            print(foods)
            q.put(foods)
    
    if __name__ == '__main__':
        q = Queue(20)               # 建立佇列,所有程序裡最大限制20
        p1 = Process(target=producer,args=('alec','包子',q))
        p1.start()
        p2 = Process(target=producer,args=('yazhou','玉米',q))
        p2.start()
        c1 = Process(target=consumer,args=(q,'喳喳輝'))
        c1.start()
        c2 = Process(target=consumer,args=(q,'古天樂'))
        c2.start()
        p1.join()           #讓生產者程序迴歸到主程式程序
        p2.join()
        q.put(None)         #佇列中放入兩個None,讓消費者取到後結束阻塞狀態
        q.put(None)
    
  • 使用Joinablequeue模組

#plan 2:
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:
        food = q.get()
        print( '\033[31m%s消費了%s\033[0m'%(name,food))
        time.sleep(random.randint(1,3))
        q.task_done()   # 每執行一次該命令都會被記錄下來,直到佇列中的所有資料都執行完此命令
def producer(name,food,q):   # 建立生產者
    for i in range(4):
        time.sleep(random.randint(1,3))
        foods = '%s生產了%s個%s'%(name,i+1,food)
        print(foods)
        q.put(foods)
    q.join()        # 程序延遲了,進入阻塞狀態,直到一個佇列中的所有資料全部被處理完畢

if __name__ == '__main__':
    q = JoinableQueue(20)               # 建立佇列,所有程序裡最大限制20
    p1 = Process(target=producer,args=('alec','包子',q))
    p2 = Process(target=producer,args=('yazhou','玉米',q))
    p1.start()
    p2.start()

    c1 = Process(target=consumer,args=(q,'喳喳輝'))
    c2 = Process(target=consumer,args=(q,'古天樂'))
    c1.daemon = True        #設定為守護程序,主程序中的程式碼執行完畢之後,子程序自動結束
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()           #讓生產者程序迴歸到主程式程序
    p2.join()

此模組執行流程:

從消費者端看:

  • 每次獲取一個數據
  • 處理一個數據
  • 傳送一個記號:標誌一個數據被處理成功

從生產者端看:

  • 每次生產一個數據,並放入佇列中

  • 對佇列中每一個數據刻上記號

  • 當生產者全部生產完畢後,傳送join訊號,程序為阻塞狀態:

    • 此時生產者已經停止生產資料了
    • 等待之前被刻上記號的資料都被消費完
    • 當資料都被處理完時,join阻塞結束
  • 1.consumer中把所有的任務都消耗完

  • 2.producer端的join感知到,停止阻塞

  • 3.所有的producer程序結束# 主程序中的p.join結束

  • 4.主程序中程式碼結束# 守護程序(消費者的程序)結束

總結:

  • 消費者模型