1. 程式人生 > 其它 >【棧與佇列】力扣155:最小棧

【棧與佇列】力扣155:最小棧

一 生產者與消費者模型

生產者與消費者是一種面向物件的設計模式,主要作用是用於解決程式中生產和消費的供需場景問題的。

示例一

 1 import time, random, os
 2 from multiprocessing import Process, Queue
 3 # IPC:程序間的通訊,可以使用Queue來完成
 4 
 5 def consumer(q):
 6     """消費者"""
 7     while True:
 8         # 從佇列中提取資料
 9         res = q.get()
10         if res is None: break
# 當佇列中提取到結束資訊時,結束當前while迴圈 11 time.sleep(random.randint(1, 3)) 12 print(f"{os.getpid()}吃{res}") 13 14 15 def producer(q): 16 """生產者""" 17 for i in range(2): 18 time.sleep(random.randint(1, 3)) 19 res = f"包子{i}" 20 q.put(res) # 把資料儲存到佇列中 21 print
(f"{os.getpid()}生產了{res}") 22 23 24 def task(p_count, c_count): 25 # 相當於服務員 26 q = Queue() 27 # 生產者們: 即廚師 28 p_list = [] 29 for i in range(p_count): 30 p = Process(target=producer, args=(q,)) 31 p.start() 32 p_list.append(p) 33 34 # 消費者們: 即顧客 35 for
i in range(c_count): 36 c1 = Process(target=consumer, args=(q,)) 37 c1.start() 38 39 for p in p_list: p.join() # 這裡阻塞等待所有的生產者全部提交任務 40 41 # 當出現多個生產者與消費者時,結束訊號就要隨著消費者的數量來發送。 42 for _ in range(c_count): 43 q.put(None) # 傳送一個結束資訊給佇列中 44 45 if __name__ == '__main__': 46 task(3, 3)
程序佇列實現生產者消費者模型

示例二【瞭解】

JoinableQueue可以建立可連線的共享程序佇列,像是一個Queue物件,但JoinableQueue佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。

 1 import time, random, os
 2 
 3 from multiprocessing import Process, JoinableQueue
 4 
 5 
 6 def consumer(jq):
 7     """消費者"""
 8     while True:
 9         res = jq.get()  # 從佇列中提取資料
10         time.sleep(random.randint(1, 3))
11         print(f"{os.getpid()}吃{res}")
12         jq.task_done()  # 向q.join()傳送一次訊號, 證明一個數據已經被取走了
13 
14 def producer(jq):
15     """生產者"""
16     for i in range(3):
17         time.sleep(random.randint(1, 3))
18         res = f"包子{i}"
19         jq.put(res)  # 把通訊的資料儲存到佇列中
20         print(f"{os.getpid()}生產了{res}")
21 
22     jq.join()  # 生產完畢,使用此方法進行阻塞,直到佇列中所有專案均被處理。
23 
24 
25 def task(p_count, c_count):
26     """任務流程"""
27     # 建立一個程序共享佇列物件
28     jq = JoinableQueue()
29     # 建立生產者
30     p_list = []
31     for i in range(p_count):
32         p = Process(target=producer, args=(jq,))
33         p.start()
34         p_list.append(p)
35 
36     # 建立消費者
37     for i in range(c_count):
38         c = Process(target=consumer, args=(jq,))
39         # 設定消費者程序為守護程序
40         c.daemon = True
41         c.start()
42 
43     # 開始
44     for p in p_list: p.join()
45     print('主程序')
46 
47 
48 if __name__ == '__main__':
49     task(3, 3)
共享程序佇列實現生產者消費者模型

二 程序間的資料共享【Manager】

多程序間的資料是獨立在不同記憶體的,而執行緒之間的資料是共享。如何讓程序間也能實現資料共享呢?

可以基於檔案來完成程序間的資料共享。但需要我們手動操作檔案來記錄程序間的共享資料。

幸運的事python裡面的multiprocessing模型已經內建實現了,那就是Manager物件。

 1 from multiprocessing import Process, Manager, Lock
 2 
 3 def func(data, lock):
 4     with lock:
 5         data["count"] -= 1
 6 
 7 if __name__ == "__main__":
 8     # 設定程序間要共享的資料
 9     manager = Manager()
10     data = manager.dict({"count": 100}) # 表示在多個子程序之間共享一個字典資料
11     lock = Lock()
12 
13     p_list = []
14     for i in range(100):
15         p = Process(target=func, args=(data, lock))
16         p.start()
17         p_list.append(p)
18 
19 
20     # 等待每一個程序執行完畢
21     for p in p_list: p.join()
22 
23     print(data) # {'count': 0}
基於manager和lock實現程序間的資料共享並保證資料安全

三 訊號量【Semaphore】適用程序和執行緒

訊號量不僅在程序模組multiprocessing中存在,而且threading模組中也有,作用一樣,只是針對的物件不一樣

遞迴鎖【RLock】:實現同一時間允許多個程序上多把鎖,只允許同一時間只有一個程序或執行緒修改資料

訊號量【Semaphore】:實現同一時間允許多個程序上多把鎖,允許同一時間多個程序同時修改多個共享資料而且還要加鎖

實現原理是基於計數器+鎖實現的,它允許同時給1個或多個程序上鎖,當資源釋放時計數器就會遞增,當資源佔用時計數器就會遞減,多個程序可以通過操作訊號量,達到同步執行的目的

 1 import random
 2 import time
 3 from multiprocessing import Process, Semaphore
 4 # from threading import Thread, Semaphore
 5 
 6 
 7 def parking_lot(car, semaphore):
 8     """停車場"""
 9     # semaphore.acquire()
10     # print(f"{car}進入停車場,目前停車位:{semaphore.get_value()}")
11     # # 因為我們都不知道顧客會停留在裡面多久,所以我們使用隨機數模擬這個停留過程
12     # time.sleep(random.randrange(4, 10))
13     # print(f"P{car}離開停車場,目前停車位:{semaphore.get_value()+1}")
14     # semaphore.release()
15 
16     with semaphore:
17         print(f"{car}進入停車場,目前停車位:{semaphore.get_value()}")
18         # 因為我們都不知道顧客會停留在裡面多久,所以我們使用隨機數模擬這個停留過程
19         time.sleep(random.randrange(4, 10))
20         print(f"P{car}離開停車場,目前停車位:{semaphore.get_value()+1}")
21 
22 if __name__ == "__main__":
23     # 最多允許4個程序同時上鎖
24     semaphore = Semaphore(4)
25 
26     # 模擬10個顧客開車進來
27     for i in range(10):
28         # 顧客什麼時候來的我們也不清楚,所以模擬下這個時間過程
29         time.sleep(random.randint(1, 5))
30         p = Process(target=parking_lot, args=(f"car-{i}", semaphore))
31         p.start()
基於訊號量實現停車場的停車程式

四 事件【Event】適用程序和執行緒

多程序模組multiprocessing與多執行緒threading模組提供了事件(Event )可以用來實現程序間/執行緒間的同步通訊

執行機制是通過定義了一個多個程序共享的全域性標記Flag,如果Flag值為 False,當程式執行event.wait()方法時就會阻塞,如果Flag值為True時,程式執行event.wait()方法時不會阻塞繼續執行。


方法
描述
wait() 根據Flag的值判斷是否要阻塞程序,Flag為True時阻塞,Flase時不阻塞
set() 將Flag的值改成True
clear() 將Flag的值改成False
is_set() 判斷當前的Flag的值
 1 import time, random
 2 from multiprocessing import Process, Event
 3 
 4 
 5 def traffic_light(event):
 6     """紅綠燈程式"""
 7     while True:
 8         if event.is_set(): # 判斷事件中的Flag標記的值,如果是True,則亮紅燈
 9             print("紅燈亮")
10             event.clear()  # 亮完紅燈以後,把Flag標記的值改為False
11         else:
12             print("綠燈亮")
13             event.set()    # 亮完綠燈以後,把Flag標記的值改為True
14         time.sleep(2)
15 
16 def car(i, event):
17     """"""
18     if not event.is_set():
19         print(f"car{i}等待紅燈")
20         event.wait()
21     print(f"car{i}通過了路口。")
22 
23 if __name__ == '__main__':
24     # 建立一個事件物件
25     event = Event()
26     p = Process(target=traffic_light, args=(event,))
27     p.start()
28 
29     # 模擬30輛小車通過紅綠燈
30     for i in range(30):
31         # 我們不知道什麼時候有車來到路口,所以隨機時間來模擬這個過程
32         time.sleep(random.randrange(0, 2))
33         p = Process(target=car, args=(i, event))
34         p.start()
基於事件event紅路燈運作程式

五 池【適用程序和執行緒】

 一個程序池或執行緒池,在裡面放上固定數量的程序或執行緒,有任務來了,就拿池中的程序或執行緒物件來處理任務,等任務處理完畢,程序或執行緒並不關閉,而是將程序或執行緒再放回池中等待下一次任務到來。如果有很多工需要併發執行,池中的程序或執行緒數量不夠,任務就要等待之前的程序或執行緒執行任務完畢歸來,拿到空閒程序或執行緒才能繼續執行。

也就是說,池中程序或執行緒的數量是固定的,那麼同一時間最多有固定數量的程序或執行緒在執行。這樣不僅減輕了作業系統的排程難度,還節省了開閉程序或執行緒的開銷,同時實現了併發效果。 

5.1 實現程序池

python中提供了2個模組提供操作:

  • multiprocessing.Pool:multiprocessing.Pool建立的程序提供2種不同的執行方式:apply(同步呼叫),apply_async(非同步呼叫)

  • concurrent.futures.ProcessPoolExecutor

5.1.1 基於multiprocessing.Pool實現程序池

 1 import time, os, random
 2 from multiprocessing import Pool
 3 
 4 def func(n):
 5     print(f"子程序{n}執行了....")
 6     time.sleep(2)
 7     return f"子程序{n}"
 8 
 9 if __name__ == '__main__':
10     start_time = time.time()
11     """建立一個程序池"""
12     # n = os.cpu_count()  # 本機CPU個數,我的是12,程序池容量個數自定義,預設CPU核數
13     # p = Pool(processes=n)
14     p = Pool(4)  # 指定程序池中初始化時建立多少個程序在裡面,預設根據作業系統的CPU邏輯數量來建立
15     """往程序池裡面的程序新增要執行的任務"""
16     res_list = []
17     # 建立20個任務
18     for i in range(20):
19         res = p.apply(func, args=(i,))  # 使用同步呼叫的方式,apply的返回值是任務的return返回值
20         res_list.append(res)
21 
22     print(f'使用時間: {time.time() - start_time}')
23     print(f"全部任務的執行結果:{res_list}")
同步呼叫程序池apply
 1 import time, os, random
 2 from multiprocessing import Pool
 3 
 4 def func(n):
 5     print(f"子程序{n}執行了....")
 6     time.sleep(2)
 7     return f"子程序{n}"
 8 
 9 if __name__ == '__main__':
10     start_time = time.time()
11     """建立一個程序池"""
12     # n = os.cpu_count()  # 本機CPU個數,我的是12,程序池容量個數自定義, 預設CPU邏輯核數
13     # p = Pool(processes=n)
14     p = Pool()  # 指定程序池中初始化時建立多少個程序在裡面,預設根據作業系統的CPU邏輯數量來建立
15     """往程序池裡面的程序新增要執行的任務"""
16     res_list = []
17     # 建立20個任務
18     for i in range(20):
19         res = p.apply_async(func, args=(i,))  # 使用非同步呼叫的方式,apply_async的返回值是任務的非同步結果物件
20         res_list.append(res)  #
21 
22     p.close()  # 關閉程序池, 不再有新的任務加入到pool中, 防止進一步的操作
23     p.join()   # 必須在close呼叫之後執行, 執行後等待所有子程序結束,否則報錯
24 
25     print(f'使用時間: {time.time() - start_time}')
26 
27     results = [res.get() for res in res_list] # get() 同步阻塞方法
28     print(f"全部任務的執行結果:{results}")
非同步呼叫示例apply_async

非同步呼叫例項apply_async:程序池實現socketserver

 1 import socket
 2 from multiprocessing import Pool
 3 
 4 def talk(conn):
 5     """通訊方法"""
 6     while True:
 7         message = conn.recv(1024)
 8         print(message)
 9         conn.send(message)
10     conn.close()
11 
12 if __name__ =="__main__":
13     sk = socket.socket()
14     sk.bind(("127.0.0.1", 9000))
15     sk.listen(5)
16 
17     # Pool預設獲取cpu_counter cpu最大邏輯核心數我的機器是12
18     p = Pool()
19 
20     while True:
21         conn, addr = sk.accept()
22         p.apply_async(talk, args=(conn,))
23     sk.close()
server.py
1 import socket
2 sk = socket.socket()
3 sk.connect( ("127.0.0.1", 9000) )
4 
5 while True:
6     content = input(">:")
7     sk.send(content.encode("utf-8"))
8     print(sk.recv(1024))
client.py

5.1.2 基於multiprocessing.futures.ProcessPoolExecutor實現程序池

 1 import random, time
 2 from concurrent.futures import ProcessPoolExecutor
 3 
 4 def func(n):
 5     print(f"子程序{n}開始執行...")
 6     time.sleep(random.randint(1, 3))
 7     print(f"子程序{n}執行結束...")
 8     return f"子程序{n}"  # 任務的返回值
 9 
10 
11 if __name__ == '__main__':
12     # 建立程序池,
13     # 可以通過processes引數指定程序池中初始化時建立多少個程序在裡面,
14     # 預設根據作業系統的CPU邏輯數量來建立
15     p = ProcessPoolExecutor(max_workers=4)
16     res_list = []
17     for i in range(20):
18         res = p.submit(func, i)  # 第一個引數為任務函式名,後續引數均為任務函式的引數
19         res_list.append(res)  # submit的返回值是一個非同步物件,通過物件的result方法可以獲取任務結果
20 
21     # print([res.result() for res in res_list])  # result阻塞同步方法,用於提取任務結果,也就是func的返回值
22 
23     # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。
24     # 相當於原來的 for p in p_list: p.join()
25     p.shutdown() #阻塞效果
26     print("主程序結束")
基於concurrent.futures實現程序池



 1 import random, time
 2 from concurrent.futures import ProcessPoolExecutor
 3 
 4 def func(n):
 5     print(f"子程序{n}開始執行...")
 6     time.sleep(random.randint(1, 3))
 7     print(f"子程序{n}執行結束...")
 8     return f"子程序{n}"  # 任務的返回值
 9 
10 
11 if __name__ == '__main__':
12     # 建立程序池,
13     # 可以通過processes引數指定程序池中初始化時建立多少個程序在裡面,
14     # 預設根據作業系統的CPU邏輯數量來建立
15     p = ProcessPoolExecutor(max_workers=4)
16 
17     # res_list = []
18     # for i in range(20):
19     #     res = p.submit(func, i)  # 第一個引數為任務函式名,後續引數均為任務函式的引數
20     #     res_list.append(res)  # submit的返回值是一個非同步物件,通過物件的result方法可以獲取任務結果
21 
22     # print([res.result() for res in res_list])  # result阻塞同步方法,用於提取任務結果,也就是func的返回值
23 
24     # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。
25     # 相當於原來的 for p in p_list: p.join()
26     # p.shutdown()
27 
28     res_list = p.map(func, range(20))
29     print([res for res in res_list])
30     print("主程序結束")
map方法優化-基於concurrent.futures實現程序池
 1 import random, time
 2 from concurrent.futures import ProcessPoolExecutor
 3 
 4 def task(n):
 5     print(f"子程序{n}開始執行...")
 6     time.sleep(random.randint(1, 3))
 7     print(f"子程序{n}執行結束...")
 8     return f"子程序{n}"  # 任務的返回值
 9 
10 
11 def task_callback(res):
12     print(f"對任務結果進行非同步回撥處理:{res.result()}")
13 
14 
15 if __name__ == '__main__':
16     p = ProcessPoolExecutor(2)
17     for i in range(5):
18         p.submit(task, i).add_done_callback(task_callback)
19 
20     p.shutdown()
21     print("主程序結束")
22 
23     # 把結果處理流程程式設計了同步回撥處理了
24     # res_list = []
25     # for i in range(5):
26     #     res = p.submit(task, i)
27     #     res_list.append(res)
28     #
29     # for res in res_list:
30     #     task_callback(res)  # result 同步阻塞
31     #
32     # p.shutdown()
33     # print("主程序結束")
add_done_callback方法-針對程序任務結果進行非同步回撥處理

5.2 實現執行緒池

threading模組並沒有像multiprocessing模組那樣提供類似程序池的功能,所以我們要實現執行緒池,只能通過concurrent.futures模組提供的ThreadPoolExecutor執行緒池類來實現,其用法與上面的的ProcessPoolExecutor一模一樣。執行緒池也有map方法,也有add_done_callback的結果非同步回撥操作。

 1 import random
 2 import time
 3 from concurrent.futures import ThreadPoolExecutor
 4 
 5 def func(n):
 6     print(f"子執行緒{n}開始執行...")
 7     time.sleep(random.randint(1, 5))
 8     print(f"子執行緒{n}執行結束...")
 9     return n
10 
11 if __name__ == '__main__':
12     p = ThreadPoolExecutor(4)
13     results = []
14     for i in range(20):
15         res = p.submit(func, i)   # 第一個引數為函式名,後續引數為函式的引數
16         results.append(res)
17 
18     # p.shutdown()  # 關閉程序池,後續不能繼續執行submit提交任務,並阻塞等待所有的提交任務全部執行完成。
19     print([r.result() for r in results])  # 提取任務結果,也就是func的返回值
20     print("主執行緒結束")
21     # 這裡也有map方法,也有add_deno_callback的回撥操作
執行緒池的實現