【棧與佇列】力扣155:最小棧
一 生產者與消費者模型
生產者與消費者是一種面向物件的設計模式,主要作用是用於解決程式中生產和消費的供需場景問題的。
示例一
1 import time, random, os 2 from multiprocessing import Process, Queue 3 # IPC:程序間的通訊,可以使用Queue來完成 4 5 def consumer(q): 6 """消費者""" 7 while True: 8 # 從佇列中提取資料 9 res = q.get() 10 if res is None: break程序佇列實現生產者消費者模型# 當佇列中提取到結束資訊時,結束當前while迴圈 11 time.sleep(random.randint(1, 3)) 12 print(f"{os.getpid()}吃{res}") 13 14 15 def producer(q): 16 """生產者""" 17 for i in range(2): 18 time.sleep(random.randint(1, 3)) 19 res = f"包子{i}" 20 q.put(res) # 把資料儲存到佇列中 21 print(f"{os.getpid()}生產了{res}") 22 23 24 def task(p_count, c_count): 25 # 相當於服務員 26 q = Queue() 27 # 生產者們: 即廚師 28 p_list = [] 29 for i in range(p_count): 30 p = Process(target=producer, args=(q,)) 31 p.start() 32 p_list.append(p) 33 34 # 消費者們: 即顧客 35 fori in range(c_count): 36 c1 = Process(target=consumer, args=(q,)) 37 c1.start() 38 39 for p in p_list: p.join() # 這裡阻塞等待所有的生產者全部提交任務 40 41 # 當出現多個生產者與消費者時,結束訊號就要隨著消費者的數量來發送。 42 for _ in range(c_count): 43 q.put(None) # 傳送一個結束資訊給佇列中 44 45 if __name__ == '__main__': 46 task(3, 3)
示例二【瞭解】
JoinableQueue可以建立可連線的共享程序佇列,像是一個Queue物件,但JoinableQueue佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。
1 import time, random, os 2 3 from multiprocessing import Process, JoinableQueue 4 5 6 def consumer(jq): 7 """消費者""" 8 while True: 9 res = jq.get() # 從佇列中提取資料 10 time.sleep(random.randint(1, 3)) 11 print(f"{os.getpid()}吃{res}") 12 jq.task_done() # 向q.join()傳送一次訊號, 證明一個數據已經被取走了 13 14 def producer(jq): 15 """生產者""" 16 for i in range(3): 17 time.sleep(random.randint(1, 3)) 18 res = f"包子{i}" 19 jq.put(res) # 把通訊的資料儲存到佇列中 20 print(f"{os.getpid()}生產了{res}") 21 22 jq.join() # 生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。 23 24 25 def task(p_count, c_count): 26 """任務流程""" 27 # 建立一個程序共享佇列物件 28 jq = JoinableQueue() 29 # 建立生產者 30 p_list = [] 31 for i in range(p_count): 32 p = Process(target=producer, args=(jq,)) 33 p.start() 34 p_list.append(p) 35 36 # 建立消費者 37 for i in range(c_count): 38 c = Process(target=consumer, args=(jq,)) 39 # 設定消費者程序為守護程序 40 c.daemon = True 41 c.start() 42 43 # 開始 44 for p in p_list: p.join() 45 print('主程序') 46 47 48 if __name__ == '__main__': 49 task(3, 3)共享程序佇列實現生產者消費者模型
二 程序間的資料共享【Manager】
多程序間的資料是獨立在不同記憶體的,而執行緒之間的資料是共享。如何讓程序間也能實現資料共享呢?
可以基於檔案來完成程序間的資料共享。但需要我們手動操作檔案來記錄程序間的共享資料。
幸運的事python裡面的multiprocessing模型已經內建實現了,那就是Manager物件。
1 from multiprocessing import Process, Manager, Lock 2 3 def func(data, lock): 4 with lock: 5 data["count"] -= 1 6 7 if __name__ == "__main__": 8 # 設定程序間要共享的資料 9 manager = Manager() 10 data = manager.dict({"count": 100}) # 表示在多個子程序之間共享一個字典資料 11 lock = Lock() 12 13 p_list = [] 14 for i in range(100): 15 p = Process(target=func, args=(data, lock)) 16 p.start() 17 p_list.append(p) 18 19 20 # 等待每一個程序執行完畢 21 for p in p_list: p.join() 22 23 print(data) # {'count': 0}基於manager和lock實現程序間的資料共享並保證資料安全
三 訊號量【Semaphore】適用程序和執行緒
遞迴鎖【RLock】:實現同一時間允許多個程序上多把鎖,只允許同一時間只有一個程序或執行緒修改資料
訊號量【Semaphore】:實現同一時間允許多個程序上多把鎖,允許同一時間多個程序同時修改多個共享資料而且還要加鎖。
實現原理是基於計數器+鎖實現的,它允許同時給1個或多個程序上鎖,當資源釋放時計數器就會遞增,當資源佔用時計數器就會遞減,多個程序可以通過操作訊號量,達到同步執行的目的
1 import random 2 import time 3 from multiprocessing import Process, Semaphore 4 # from threading import Thread, Semaphore 5 6 7 def parking_lot(car, semaphore): 8 """停車場""" 9 # semaphore.acquire() 10 # print(f"{car}進入停車場,目前停車位:{semaphore.get_value()}") 11 # # 因為我們都不知道顧客會停留在裡面多久,所以我們使用隨機數模擬這個停留過程 12 # time.sleep(random.randrange(4, 10)) 13 # print(f"P{car}離開停車場,目前停車位:{semaphore.get_value()+1}") 14 # semaphore.release() 15 16 with semaphore: 17 print(f"{car}進入停車場,目前停車位:{semaphore.get_value()}") 18 # 因為我們都不知道顧客會停留在裡面多久,所以我們使用隨機數模擬這個停留過程 19 time.sleep(random.randrange(4, 10)) 20 print(f"P{car}離開停車場,目前停車位:{semaphore.get_value()+1}") 21 22 if __name__ == "__main__": 23 # 最多允許4個程序同時上鎖 24 semaphore = Semaphore(4) 25 26 # 模擬10個顧客開車進來 27 for i in range(10): 28 # 顧客什麼時候來的我們也不清楚,所以模擬下這個時間過程 29 time.sleep(random.randint(1, 5)) 30 p = Process(target=parking_lot, args=(f"car-{i}", semaphore)) 31 p.start()基於訊號量實現停車場的停車程式
四 事件【Event】適用程序和執行緒
方法 |
描述 |
---|---|
wait() | 根據Flag的值判斷是否要阻塞程序,Flag為True時阻塞,Flase時不阻塞 |
set() | 將Flag的值改成True |
clear() | 將Flag的值改成False |
is_set() | 判斷當前的Flag的值 |
1 import time, random 2 from multiprocessing import Process, Event 3 4 5 def traffic_light(event): 6 """紅綠燈程式""" 7 while True: 8 if event.is_set(): # 判斷事件中的Flag標記的值,如果是True,則亮紅燈 9 print("紅燈亮") 10 event.clear() # 亮完紅燈以後,把Flag標記的值改為False 11 else: 12 print("綠燈亮") 13 event.set() # 亮完綠燈以後,把Flag標記的值改為True 14 time.sleep(2) 15 16 def car(i, event): 17 """車""" 18 if not event.is_set(): 19 print(f"car{i}等待紅燈") 20 event.wait() 21 print(f"car{i}通過了路口。") 22 23 if __name__ == '__main__': 24 # 建立一個事件物件 25 event = Event() 26 p = Process(target=traffic_light, args=(event,)) 27 p.start() 28 29 # 模擬30輛小車通過紅綠燈 30 for i in range(30): 31 # 我們不知道什麼時候有車來到路口,所以隨機時間來模擬這個過程 32 time.sleep(random.randrange(0, 2)) 33 p = Process(target=car, args=(i, event)) 34 p.start()基於事件event紅路燈運作程式
五 池【適用程序和執行緒】
一個程序池或執行緒池,在裡面放上固定數量的程序或執行緒,有任務來了,就拿池中的程序或執行緒物件來處理任務,等任務處理完畢,程序或執行緒並不關閉,而是將程序或執行緒再放回池中等待下一次任務到來。如果有很多工需要併發執行,池中的程序或執行緒數量不夠,任務就要等待之前的程序或執行緒執行任務完畢歸來,拿到空閒程序或執行緒才能繼續執行。
也就是說,池中程序或執行緒的數量是固定的,那麼同一時間最多有固定數量的程序或執行緒在執行。這樣不僅減輕了作業系統的排程難度,還節省了開閉程序或執行緒的開銷,同時實現了併發效果。
5.1 實現程序池
python中提供了2個模組提供操作:
-
multiprocessing.Pool:multiprocessing.Pool建立的程序提供2種不同的執行方式:apply(同步呼叫),apply_async(非同步呼叫)
-
concurrent.futures.ProcessPoolExecutor
5.1.1 基於multiprocessing.Pool實現程序池
1 import time, os, random 2 from multiprocessing import Pool 3 4 def func(n): 5 print(f"子程序{n}執行了....") 6 time.sleep(2) 7 return f"子程序{n}" 8 9 if __name__ == '__main__': 10 start_time = time.time() 11 """建立一個程序池""" 12 # n = os.cpu_count() # 本機CPU個數,我的是12,程序池容量個數自定義,預設CPU核數 13 # p = Pool(processes=n) 14 p = Pool(4) # 指定程序池中初始化時建立多少個程序在裡面,預設根據作業系統的CPU邏輯數量來建立 15 """往程序池裡面的程序新增要執行的任務""" 16 res_list = [] 17 # 建立20個任務 18 for i in range(20): 19 res = p.apply(func, args=(i,)) # 使用同步呼叫的方式,apply的返回值是任務的return返回值 20 res_list.append(res) 21 22 print(f'使用時間: {time.time() - start_time}') 23 print(f"全部任務的執行結果:{res_list}")同步呼叫程序池apply
1 import time, os, random 2 from multiprocessing import Pool 3 4 def func(n): 5 print(f"子程序{n}執行了....") 6 time.sleep(2) 7 return f"子程序{n}" 8 9 if __name__ == '__main__': 10 start_time = time.time() 11 """建立一個程序池""" 12 # n = os.cpu_count() # 本機CPU個數,我的是12,程序池容量個數自定義, 預設CPU邏輯核數 13 # p = Pool(processes=n) 14 p = Pool() # 指定程序池中初始化時建立多少個程序在裡面,預設根據作業系統的CPU邏輯數量來建立 15 """往程序池裡面的程序新增要執行的任務""" 16 res_list = [] 17 # 建立20個任務 18 for i in range(20): 19 res = p.apply_async(func, args=(i,)) # 使用非同步呼叫的方式,apply_async的返回值是任務的非同步結果物件 20 res_list.append(res) # 21 22 p.close() # 關閉程序池, 不再有新的任務加入到pool中, 防止進一步的操作 23 p.join() # 必須在close呼叫之後執行, 執行後等待所有子程序結束,否則報錯 24 25 print(f'使用時間: {time.time() - start_time}') 26 27 results = [res.get() for res in res_list] # get() 同步阻塞方法 28 print(f"全部任務的執行結果:{results}")非同步呼叫示例apply_async
非同步呼叫例項apply_async:程序池實現socketserver
1 import socket 2 from multiprocessing import Pool 3 4 def talk(conn): 5 """通訊方法""" 6 while True: 7 message = conn.recv(1024) 8 print(message) 9 conn.send(message) 10 conn.close() 11 12 if __name__ =="__main__": 13 sk = socket.socket() 14 sk.bind(("127.0.0.1", 9000)) 15 sk.listen(5) 16 17 # Pool預設獲取cpu_counter cpu最大邏輯核心數我的機器是12 18 p = Pool() 19 20 while True: 21 conn, addr = sk.accept() 22 p.apply_async(talk, args=(conn,)) 23 sk.close()server.py
1 import socket 2 sk = socket.socket() 3 sk.connect( ("127.0.0.1", 9000) ) 4 5 while True: 6 content = input(">:") 7 sk.send(content.encode("utf-8")) 8 print(sk.recv(1024))client.py
5.1.2 基於multiprocessing.futures.ProcessPoolExecutor實現程序池
1 import random, time 2 from concurrent.futures import ProcessPoolExecutor 3 4 def func(n): 5 print(f"子程序{n}開始執行...") 6 time.sleep(random.randint(1, 3)) 7 print(f"子程序{n}執行結束...") 8 return f"子程序{n}" # 任務的返回值 9 10 11 if __name__ == '__main__': 12 # 建立程序池, 13 # 可以通過processes引數指定程序池中初始化時建立多少個程序在裡面, 14 # 預設根據作業系統的CPU邏輯數量來建立 15 p = ProcessPoolExecutor(max_workers=4) 16 res_list = [] 17 for i in range(20): 18 res = p.submit(func, i) # 第一個引數為任務函式名,後續引數均為任務函式的引數 19 res_list.append(res) # submit的返回值是一個非同步物件,通過物件的result方法可以獲取任務結果 20 21 # print([res.result() for res in res_list]) # result阻塞同步方法,用於提取任務結果,也就是func的返回值 22 23 # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。 24 # 相當於原來的 for p in p_list: p.join() 25 p.shutdown() #阻塞效果 26 print("主程序結束")基於concurrent.futures實現程序池
1 import random, time 2 from concurrent.futures import ProcessPoolExecutor 3 4 def func(n): 5 print(f"子程序{n}開始執行...") 6 time.sleep(random.randint(1, 3)) 7 print(f"子程序{n}執行結束...") 8 return f"子程序{n}" # 任務的返回值 9 10 11 if __name__ == '__main__': 12 # 建立程序池, 13 # 可以通過processes引數指定程序池中初始化時建立多少個程序在裡面, 14 # 預設根據作業系統的CPU邏輯數量來建立 15 p = ProcessPoolExecutor(max_workers=4) 16 17 # res_list = [] 18 # for i in range(20): 19 # res = p.submit(func, i) # 第一個引數為任務函式名,後續引數均為任務函式的引數 20 # res_list.append(res) # submit的返回值是一個非同步物件,通過物件的result方法可以獲取任務結果 21 22 # print([res.result() for res in res_list]) # result阻塞同步方法,用於提取任務結果,也就是func的返回值 23 24 # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。 25 # 相當於原來的 for p in p_list: p.join() 26 # p.shutdown() 27 28 res_list = p.map(func, range(20)) 29 print([res for res in res_list]) 30 print("主程序結束")map方法優化-基於concurrent.futures實現程序池
1 import random, time 2 from concurrent.futures import ProcessPoolExecutor 3 4 def task(n): 5 print(f"子程序{n}開始執行...") 6 time.sleep(random.randint(1, 3)) 7 print(f"子程序{n}執行結束...") 8 return f"子程序{n}" # 任務的返回值 9 10 11 def task_callback(res): 12 print(f"對任務結果進行非同步回撥處理:{res.result()}") 13 14 15 if __name__ == '__main__': 16 p = ProcessPoolExecutor(2) 17 for i in range(5): 18 p.submit(task, i).add_done_callback(task_callback) 19 20 p.shutdown() 21 print("主程序結束") 22 23 # 把結果處理流程程式設計了同步回撥處理了 24 # res_list = [] 25 # for i in range(5): 26 # res = p.submit(task, i) 27 # res_list.append(res) 28 # 29 # for res in res_list: 30 # task_callback(res) # result 同步阻塞 31 # 32 # p.shutdown() 33 # print("主程序結束")add_done_callback方法-針對程序任務結果進行非同步回撥處理
5.2 實現執行緒池
threading模組並沒有像multiprocessing模組那樣提供類似程序池的功能,所以我們要實現執行緒池,只能通過concurrent.futures模組提供的ThreadPoolExecutor執行緒池類來實現,其用法與上面的的ProcessPoolExecutor一模一樣。執行緒池也有map方法,也有add_done_callback的結果非同步回撥操作。
1 import random 2 import time 3 from concurrent.futures import ThreadPoolExecutor 4 5 def func(n): 6 print(f"子執行緒{n}開始執行...") 7 time.sleep(random.randint(1, 5)) 8 print(f"子執行緒{n}執行結束...") 9 return n 10 11 if __name__ == '__main__': 12 p = ThreadPoolExecutor(4) 13 results = [] 14 for i in range(20): 15 res = p.submit(func, i) # 第一個引數為函式名,後續引數為函式的引數 16 results.append(res) 17 18 # p.shutdown() # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。 19 print([r.result() for r in results]) # 提取任務結果,也就是func的返回值 20 print("主執行緒結束") 21 # 這裡也有map方法,也有add_deno_callback的回撥操作執行緒池的實現