生產者+消費者模型、控制內存
阿新 • • 發佈:2019-01-26
get 解決辦法 發現 one ons imp 內存 停止 生產者消費者模型
生產者+消費者模型
從網上爬取數據
從網頁上獲取數據的過程 --》 生產數據的過程,爬取網頁,是生產者行為
把數據取回來進行分析得出結果 --》數據消費過程,是消費者行為
使用隊列來完成生產、消費的過程
生產者,是進程
消費者,是進程
生產者與消費者之間,傳遞數據,需要一個盤子(IPC)
# 沒設置時間延遲的情況下 from multiprocessing import Queue, Process def consumer(name, q): while 1: food = q.get() # 會阻塞,直到隊列裏有數據就取出來 print("%s把包子%s號給吃了" % (name, food)) def producer(q, food_name): for i in range(10): food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0號 # 我做了包子1號 # 我做了包子2號 # 我做了包子3號 # 我做了包子4號 # alex把包子包子0號給吃了 # 我做了包子5號 # 我做了包子6號 # 我做了包子7號 # 我做了包子8號 # 我做了包子9號 # alex把包子包子1號給吃了 # alex把包子包子2號給吃了 # alex把包子包子3號給吃了 # alex把包子包子4號給吃了 # alex把包子包子5號給吃了 # alex把包子包子6號給吃了 # alex把包子包子7號給吃了 # alex把包子包子8號給吃了 # alex把包子包子9號給吃了# 運行發現明顯生產比消費更快,供大於求,註意程序並沒有停止運行
# 設置時間延遲後 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 會阻塞,直到隊列裏有數據就取出來 time.sleep(1) print("%s把包子%s號給吃了" % (name, food)) def producer(q, food_name): for i in range(10): time.sleep(0.5) food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0號 # 我做了包子1號 # 我做了包子2號 # alex把包子包子0號給吃了 # 我做了包子3號 # alex把包子包子1號給吃了 # 我做了包子4號 # 我做了包子5號 # alex把包子包子2號給吃了 # 我做了包子6號 # 我做了包子7號 # alex把包子包子3號給吃了 # 我做了包子8號 # 我做了包子9號 # alex把包子包子4號給吃了 # alex把包子包子5號給吃了 # alex把包子包子6號給吃了 # alex把包子包子7號給吃了 # alex把包子包子8號給吃了 # alex把包子包子9號給吃了 # 運行發現,雖然設置了延遲,但是生產(producer)的延遲比消費(consumer)的延遲更短 # 所以還是供大於求
# 修改生產者與消費者的延遲設置 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 會阻塞,直到隊列裏有數據就取出來 time.sleep(0.5) print("%s把包子%s號給吃了" % (name, food)) def producer(q, food_name): for i in range(10): time.sleep(1) food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0號 # alex把包子包子0號給吃了 # 我做了包子1號 # alex把包子包子1號給吃了 # 我做了包子2號 # alex把包子包子2號給吃了 # 我做了包子3號 # alex把包子包子3號給吃了 # 我做了包子4號 # alex把包子包子4號給吃了 # 我做了包子5號 # alex把包子包子5號給吃了 # 我做了包子6號 # alex把包子包子6號給吃了 # 我做了包子7號 # alex把包子包子7號給吃了 # 我做了包子8號 # alex把包子包子8號給吃了 # 我做了包子9號 # alex把包子包子9號給吃了 # 很明顯,在只有一個消費者和一個生產者的前提下,如果生產的時間要比消費的時間更長 # 那麽每生產一個包子,消費者都能立刻吃掉,並且等著下一個生產好的包子
# 基於隊列實現生產者消費者模型 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 會阻塞,直到隊列裏有數據就取出來 time.sleep(2) print("%s拿到了包子%s號後並把它吃掉" % (name, food)) def producer(q, food_name): for i in range(20): # 生產20個包子 time.sleep(0.1) food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) # 如果隊列滿了就在這裏阻塞,直到隊列裏有空位置 if __name__ == "__main__": q = Queue(5) # 這個隊列的容量,這裏是指 6個的容量 p = Process(target=consumer, args=("alex", q)) # 消費者1號 p1 = Process(target=consumer, args=("太白", q)) # 消費者2號 p2 = Process(target=producer, args=(q, "包子")) # 生產者1號 p.start() p1.start() p2.start() # 我做了包子0號 # 我做了包子1號 # 我做了包子2號 # 我做了包子3號 # 我做了包子4號 # 我做了包子5號 # 我做了包子6號 # 我做了包子7號 # alex拿到包子包子0號並把它吃掉 # 太白拿到包子包子1號並把它吃掉 # 我做了包子8號 # 我做了包子9號 # alex拿到包子包子2號並把它吃掉 # 我做了包子10號 # 太白拿到包子包子3號並把它吃掉 # 我做了包子11號 # alex拿到包子包子4號並把它吃掉 # 太白拿到包子包子5號並把它吃掉 # 我做了包子12號 # 我做了包子13號 # alex拿到包子包子6號並把它吃掉 # 我做了包子14號 # 太白拿到包子包子7號並把它吃掉 # 我做了包子15號 # alex拿到包子包子8號並把它吃掉 # 我做了包子16號 # 太白拿到包子包子9號並把它吃掉 # 我做了包子17號 # alex拿到包子包子10號並把它吃掉 # 我做了包子18號 # 太白拿到包子包子11號並把它吃掉 # 我做了包子19號 # alex拿到包子包子12號並把它吃掉 # 太白拿到包子包子13號並把它吃掉 # alex拿到包子包子14號並把它吃掉 # 太白拿到包子包子15號並把它吃掉 # alex拿到包子包子16號並把它吃掉 # 太白拿到包子包子17號並把它吃掉 # alex拿到包子包子18號並把它吃掉 # 太白拿到包子包子19號並把它吃掉 # alex拿到包子None號並把它吃掉 # 太白拿到包子None號並把它吃掉 # 註意一開始為什麽要生產8個包子才開始吃? # 因為是本來這個隊列q = Queue(5)限定了只能放6個包子進去 # 但是生產者一旦把6個包子放進去,兩個消費者就分別從裏面取出一個包子 # 只不過,重點是消費者要2s後才能繼續拿包子吃,但他們已經把兩個包子拿出來了 # 所以生產者還得生產2個包子放進去,把隊列排滿 # 運行發現,中間之所以吃了一個才會生產一個,是因為隊列已經滿了
# 前面所有的示例, 都是主進程永遠不會結束 # 原因是:生產者p在生產完後就結束了 # 但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步 # 解決辦法: 主進程裏發結束信號,但主進程需要等生產者結束後再發送該信號 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() if not food: break time.sleep(2) print("%s拿到了%s號並開始吃它" % (name, food)) def producer(q, food_name): for i in range(20): # 生產20個包子 time.sleep(0.1) food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) # 如果隊列滿了就在這裏阻塞,直到隊列裏有空位置 if __name__ == "__main__": q = Queue(5) # 這個隊列的容量,這裏是指 6個的容量 p = Process(target=consumer, args=("alex", q)) # 消費者1號 p1 = Process(target=consumer, args=("太白", q)) # 消費者2號 p2 = Process(target=producer, args=(q, "包子")) # 生產者1號 p.start() p2.start() p1.start() p2.join() # 確認生產者把所需生產數量都生產完畢 q.put(None) q.put(None) # 運行結果和上面一樣,不同的是程序會結束 # 吃的人是消費數據的 # 制造吃的人是生產數據的 # 生產快,消費慢,供過於求 # 增加消費者 # 生產慢,消費快,供不應求 # 增加生產者
# 控制內存 import queue q = queue.Queue(5) # 設置一個容量值,保護內存 q.put(1) print("1 over") q.put(2) print("2 over") q.put(3) print("3 over") q.put(4) print("4 over") q.put(5) print("5 over") q.put(6) print("6 over") # 1 over # 2 over # 3 over # 4 over # 5 over # 運行發現程序不會停,因為它還在等,等設置的內存裏減少東西再執行q.put(6)
生產者+消費者模型、控制內存