併發程式設計筆記(2)——訊號量、事件、佇列(程序間的通訊)
阿新 • • 發佈:2020-08-10
內容目錄
- 訊號量
- 事件
- 佇列
內容詳細
訊號量(重點)
-
可以規定有多少程序使用關鍵程式碼,其餘程序阻塞,直到有子程序釋放
-
示例:模擬KTV使用,同時只有4個人使用
import random import time from multiprocessing import Process from multiprocessing import Semaphore #使用訊號量模組 def ktv(i,sem): sem.acquire() #獲取鑰匙,只有4個程序可以執行,後續阻塞 print('%s走進KTV'%i) time.sleep(random.randint(5,10)) #模擬程序使用的時間(隨機秒數) print('%s走出KTV'%i) sem.release() #程序結束後還鑰匙 if __name__ == '__main__': sem = Semaphore(4) #例項化訊號量,規定多少程序可以使用 for i in range(20): p = Process(target=ktv,args=(i,sem)) p.start()
事件(重點)
-
通過一個訊號來控制多個程序同時執行或阻塞
-
一個事件被建立之後,預設是阻塞狀態
from multiprocessing import Event #使用事件模組 # 一個訊號可以使所有的程序都進入阻塞狀態 # 也可以控制所有的程序解除阻塞 # 一個事件被建立之後,預設是阻塞狀態 e = Event() #建立一個事件 print(e.is_set()) #檢視一個事件的狀態,預設被設定成阻塞 e.set() #將這個事件的狀態改為True print(e.is_set()) e.wait() #依據e.is_set()的值來決定是否阻塞的 print(123456) e.clear() #將這個事件的狀態改為False print(e.is_set()) e.wait() #此時程式為阻塞狀態,等待訊號變為True print('*'*10)
-
set 和 clear
- 分別用來修改一個事件的狀態,True或者False
-
is_set用來檢視一個事件的狀態
-
wait 是依據事件的狀態來決定自己是否阻塞
- is_set()狀態為False是阻塞,True是不阻塞
-
事件示例:經典的紅綠燈事件
# 綠燈來了車通過,紅燈車為阻塞狀態 import time import random from multiprocessing import Event,Process def cars(e,i): if not e.is_set(): print('car%i在等待'%i) e.wait() # 阻塞 直到得到一個事件狀態變為True的訊號 print('\033[0;32;40mcar%s通過\033[0m'%i) def light(e): while True: if e.is_set(): e.clear() print('\033[31m紅燈亮了\033[0m') else: e.set() print('\033[32m綠燈亮了\033[0m') time.sleep(2) if __name__ == '__main__': e = Event() traffic = Process(target=light,args=(e,)) traffic.start() for i in range(20): car = Process(target=cars,args=(e,i)) car.start() time.sleep(random.randint(1,5))
佇列 --- 程序間的通訊
-
程序間通訊-IPC(使用Queue模組)
import time from multiprocessing import Queue #使用佇列模組 q = Queue(5) #設定佇列中只能有5個程序或資料 q.put(1) #往佇列中放入1 q.put(2) q.put(3) q.put(4) q.put(5) #此時佇列已經滿了,如果再往裡放則為阻塞狀態 print(q.full()) # 查詢佇列是否滿了(True為滿了,False為不滿) print(q.get()) #從佇列中取出 print(q.get()) print(q.get()) print(q.get()) print(q.get()) #此時佇列已經空了,如果繼續取出,則為阻塞狀態 print(q.empty()) # 查詢佇列是否為空(True為空,False為未空) q.get_nowait() #強制取值,此時會報queue.Empty錯誤,表示佇列已經空 # while True: # try: # q.get_nowait() # except: # print('佇列已空') # time.sleep(0.5) # while迴圈1秒會迴圈上千次,損耗記憶體,此時加上睡眠時間以避免記憶體過度損耗
-
注意:
- q.empty()為檢查佇列是否為空,有不可靠因素。此方法是實時檢測佇列是否為空,如果此時生產者有往佇列中正在新增的程序時,佇列此時為空
-
簡單的多程序佇列模型
from multiprocessing import Queue,Process def produce(q): #生產資料 q.put('hello') def consume(q): #取出資料 print(q.get()) if __name__ == '__main__': q = Queue() #此隊列表示沒有限制 p = Process(target=produce,args=(q,)) p.start() c = Process(target=consume,args=(q,)) c.start()
-
經典的生產者消費者模型
- 為解決供需不平衡的問題
#plan 1:不完整,存在BUG import time import random from multiprocessing import Queue,Process def consumer(q,name): while True: food = q.get() if food is None: #如果獲取到None則終止迴圈,問題bug:此時如果多個消費者取佇列中的None, #只能是第一個程序能取到,剩餘程序為阻塞狀態。需要往佇列中新增與消費者數量相匹配的None。 print('%s獲取到一個空'%name) break print( '\033[31m%s消費了%s\033[0m'%(name,food)) time.sleep(random.randint(1,3)) def producer(name,food,q): # 建立生產者 for i in range(4): time.sleep(random.randint(1,3)) foods = '%s生產了%s個%s'%(name,i+1,food) print(foods) q.put(foods) if __name__ == '__main__': q = Queue(20) # 建立佇列,所有程序裡最大限制20 p1 = Process(target=producer,args=('alec','包子',q)) p1.start() p2 = Process(target=producer,args=('yazhou','玉米',q)) p2.start() c1 = Process(target=consumer,args=(q,'喳喳輝')) c1.start() c2 = Process(target=consumer,args=(q,'古天樂')) c2.start() p1.join() #讓生產者程序迴歸到主程式程序 p2.join() q.put(None) #佇列中放入兩個None,讓消費者取到後結束阻塞狀態 q.put(None)
-
使用Joinablequeue模組
#plan 2:
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
while True:
food = q.get()
print( '\033[31m%s消費了%s\033[0m'%(name,food))
time.sleep(random.randint(1,3))
q.task_done() # 每執行一次該命令都會被記錄下來,直到佇列中的所有資料都執行完此命令
def producer(name,food,q): # 建立生產者
for i in range(4):
time.sleep(random.randint(1,3))
foods = '%s生產了%s個%s'%(name,i+1,food)
print(foods)
q.put(foods)
q.join() # 程序延遲了,進入阻塞狀態,直到一個佇列中的所有資料全部被處理完畢
if __name__ == '__main__':
q = JoinableQueue(20) # 建立佇列,所有程序裡最大限制20
p1 = Process(target=producer,args=('alec','包子',q))
p2 = Process(target=producer,args=('yazhou','玉米',q))
p1.start()
p2.start()
c1 = Process(target=consumer,args=(q,'喳喳輝'))
c2 = Process(target=consumer,args=(q,'古天樂'))
c1.daemon = True #設定為守護程序,主程序中的程式碼執行完畢之後,子程序自動結束
c2.daemon = True
c1.start()
c2.start()
p1.join() #讓生產者程序迴歸到主程式程序
p2.join()
此模組執行流程:
從消費者端看:
- 每次獲取一個數據
- 處理一個數據
- 傳送一個記號:標誌一個數據被處理成功
從生產者端看:
-
每次生產一個數據,並放入佇列中
-
對佇列中每一個數據刻上記號
-
當生產者全部生產完畢後,傳送join訊號,程序為阻塞狀態:
- 此時生產者已經停止生產資料了
- 等待之前被刻上記號的資料都被消費完
- 當資料都被處理完時,join阻塞結束
-
1.consumer中把所有的任務都消耗完
-
2.producer端的join感知到,停止阻塞
-
3.所有的producer程序結束# 主程序中的p.join結束
-
4.主程序中程式碼結束# 守護程序(消費者的程序)結束
總結:
- 消費者模型