1. 程式人生 > >生產者+消費者模型、控制內存

生產者+消費者模型、控制內存

get 解決辦法 發現 one ons imp 內存 停止 生產者消費者模型

生產者+消費者模型
從網上爬取數據
從網頁上獲取數據的過程 --》 生產數據的過程,爬取網頁,是生產者行為
把數據取回來進行分析得出結果 --》數據消費過程,是消費者行為

使用隊列來完成生產、消費的過程
生產者,是進程
消費者,是進程
生產者與消費者之間,傳遞數據,需要一個盤子(IPC)

# 沒設置時間延遲的情況下

from multiprocessing import Queue, Process

def consumer(name, q):
    while 1:
        food = q.get()  # 會阻塞,直到隊列裏有數據就取出來
        print("%s把包子%s號給吃了
" % (name, food)) def producer(q, food_name): for i in range(10): food = "%s%s" % (food_name, i) print("我做了%s號" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子
")) p.start() p1.start() # 我做了包子0號 # 我做了包子1號 # 我做了包子2號 # 我做了包子3號 # 我做了包子4號 # alex把包子包子0號給吃了 # 我做了包子5號 # 我做了包子6號 # 我做了包子7號 # 我做了包子8號 # 我做了包子9號 # alex把包子包子1號給吃了 # alex把包子包子2號給吃了 # alex把包子包子3號給吃了 # alex把包子包子4號給吃了 # alex把包子包子5號給吃了 # alex把包子包子6號給吃了 # alex把包子包子7號給吃了 # alex把包子包子8號給吃了 # alex把包子包子9號給吃了
# 運行發現明顯生產比消費更快,供大於求,註意程序並沒有停止運行

# 設置時間延遲後

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 會阻塞,直到隊列裏有數據就取出來
        time.sleep(1)
        print("%s把包子%s號給吃了" % (name, food))

def producer(q, food_name):
    for i in range(10):
        time.sleep(0.5)
        food = "%s%s" % (food_name, i)
        print("我做了%s號" % food)
        q.put(food)

if __name__ == "__main__":
    q = Queue()
    p = Process(target=consumer, args=("alex", q))
    p1 = Process(target=producer, args=(q, "包子"))
    p.start()
    p1.start()

# 我做了包子0號
# 我做了包子1號
# 我做了包子2號
# alex把包子包子0號給吃了
# 我做了包子3號
# alex把包子包子1號給吃了
# 我做了包子4號
# 我做了包子5號
# alex把包子包子2號給吃了
# 我做了包子6號
# 我做了包子7號
# alex把包子包子3號給吃了
# 我做了包子8號
# 我做了包子9號
# alex把包子包子4號給吃了
# alex把包子包子5號給吃了
# alex把包子包子6號給吃了
# alex把包子包子7號給吃了
# alex把包子包子8號給吃了
# alex把包子包子9號給吃了

# 運行發現,雖然設置了延遲,但是生產(producer)的延遲比消費(consumer)的延遲更短
# 所以還是供大於求

# 修改生產者與消費者的延遲設置

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 會阻塞,直到隊列裏有數據就取出來
        time.sleep(0.5)
        print("%s把包子%s號給吃了" % (name, food))

def producer(q, food_name):
    for i in range(10):
        time.sleep(1)
        food = "%s%s" % (food_name, i)
        print("我做了%s號" % food)
        q.put(food)

if __name__ == "__main__":
    q = Queue()
    p = Process(target=consumer, args=("alex", q))
    p1 = Process(target=producer, args=(q, "包子"))
    p.start()
    p1.start()

# 我做了包子0號
# alex把包子包子0號給吃了
# 我做了包子1號
# alex把包子包子1號給吃了
# 我做了包子2號
# alex把包子包子2號給吃了
# 我做了包子3號
# alex把包子包子3號給吃了
# 我做了包子4號
# alex把包子包子4號給吃了
# 我做了包子5號
# alex把包子包子5號給吃了
# 我做了包子6號
# alex把包子包子6號給吃了
# 我做了包子7號
# alex把包子包子7號給吃了
# 我做了包子8號
# alex把包子包子8號給吃了
# 我做了包子9號
# alex把包子包子9號給吃了

# 很明顯,在只有一個消費者和一個生產者的前提下,如果生產的時間要比消費的時間更長
# 那麽每生產一個包子,消費者都能立刻吃掉,並且等著下一個生產好的包子

# 基於隊列實現生產者消費者模型

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()  # 會阻塞,直到隊列裏有數據就取出來
        time.sleep(2)
        print("%s拿到了包子%s號後並把它吃掉" % (name, food))

def producer(q, food_name):
    for i in range(20):  # 生產20個包子
        time.sleep(0.1)
        food = "%s%s" % (food_name, i)
        print("我做了%s號" % food)
        q.put(food)  # 如果隊列滿了就在這裏阻塞,直到隊列裏有空位置

if __name__ == "__main__":
    q = Queue(5)  # 這個隊列的容量,這裏是指 6個的容量
    p = Process(target=consumer, args=("alex", q))  # 消費者1號
    p1 = Process(target=consumer, args=("太白", q))  # 消費者2號
    p2 = Process(target=producer, args=(q, "包子"))  #  生產者1號
    p.start()
    p1.start()
    p2.start()

# 我做了包子0號
# 我做了包子1號
# 我做了包子2號
# 我做了包子3號
# 我做了包子4號
# 我做了包子5號
# 我做了包子6號
# 我做了包子7號
# alex拿到包子包子0號並把它吃掉
# 太白拿到包子包子1號並把它吃掉
# 我做了包子8號
# 我做了包子9號
# alex拿到包子包子2號並把它吃掉
# 我做了包子10號
# 太白拿到包子包子3號並把它吃掉
# 我做了包子11號
# alex拿到包子包子4號並把它吃掉
# 太白拿到包子包子5號並把它吃掉
# 我做了包子12號
# 我做了包子13號
# alex拿到包子包子6號並把它吃掉
# 我做了包子14號
# 太白拿到包子包子7號並把它吃掉
# 我做了包子15號
# alex拿到包子包子8號並把它吃掉
# 我做了包子16號
# 太白拿到包子包子9號並把它吃掉
# 我做了包子17號
# alex拿到包子包子10號並把它吃掉
# 我做了包子18號
# 太白拿到包子包子11號並把它吃掉
# 我做了包子19號
# alex拿到包子包子12號並把它吃掉
# 太白拿到包子包子13號並把它吃掉
# alex拿到包子包子14號並把它吃掉
# 太白拿到包子包子15號並把它吃掉
# alex拿到包子包子16號並把它吃掉
# 太白拿到包子包子17號並把它吃掉
# alex拿到包子包子18號並把它吃掉
# 太白拿到包子包子19號並把它吃掉
# alex拿到包子None號並把它吃掉
# 太白拿到包子None號並把它吃掉

# 註意一開始為什麽要生產8個包子才開始吃?
# 因為是本來這個隊列q = Queue(5)限定了只能放6個包子進去
# 但是生產者一旦把6個包子放進去,兩個消費者就分別從裏面取出一個包子
# 只不過,重點是消費者要2s後才能繼續拿包子吃,但他們已經把兩個包子拿出來了
# 所以生產者還得生產2個包子放進去,把隊列排滿
# 運行發現,中間之所以吃了一個才會生產一個,是因為隊列已經滿了

# 前面所有的示例, 都是主進程永遠不會結束
# 原因是:生產者p在生產完後就結束了
# 但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步
# 解決辦法: 主進程裏發結束信號,但主進程需要等生產者結束後再發送該信號

from multiprocessing import Queue, Process
import time

def consumer(name, q):
    while 1:
        food = q.get()
        if not food: break
        time.sleep(2)
        print("%s拿到了%s號並開始吃它" % (name, food))


def producer(q, food_name):
    for i in range(20):  # 生產20個包子
        time.sleep(0.1)
        food = "%s%s" % (food_name, i)
        print("我做了%s號" % food)
        q.put(food)  # 如果隊列滿了就在這裏阻塞,直到隊列裏有空位置

if __name__ == "__main__":
    q = Queue(5)  # 這個隊列的容量,這裏是指 6個的容量
    p = Process(target=consumer, args=("alex", q))  # 消費者1號
    p1 = Process(target=consumer, args=("太白", q))  # 消費者2號
    p2 = Process(target=producer, args=(q, "包子"))  # 生產者1號
    p.start()
    p2.start()
    p1.start()
    p2.join()  # 確認生產者把所需生產數量都生產完畢
    q.put(None)
    q.put(None)

# 運行結果和上面一樣,不同的是程序會結束

# 吃的人是消費數據的
# 制造吃的人是生產數據的
# 生產快,消費慢,供過於求
#   增加消費者
# 生產慢,消費快,供不應求
#   增加生產者

# 控制內存

import queue

q = queue.Queue(5)  # 設置一個容量值,保護內存
q.put(1)
print("1 over")
q.put(2)
print("2 over")
q.put(3)
print("3 over")
q.put(4)
print("4 over")
q.put(5)
print("5 over")
q.put(6)
print("6 over")
# 1 over
# 2 over
# 3 over
# 4 over
# 5 over

# 運行發現程序不會停,因為它還在等,等設置的內存裏減少東西再執行q.put(6)

生產者+消費者模型、控制內存