並發編程的那些事兒(四)
1、生產者和消費者模型
作用:用於解耦。
原理:生產者將生產的數據放到緩存區,消費者從緩存區直接取。借助隊列來實現該模型(隊列就是緩存區)
隊列是安全的,自帶鎖機制。
q = Queue(num) num 為隊列的最大長度,可以自己設定。
q.put(): 向隊列裏放數據,如果數據滿了就阻塞等待,如果還能放就直接放如。
q.get(): 阻塞等待獲取數據,如果隊列中有數據直接拿,沒有就等待。
q.put_nowait(): 不阻塞,向隊列裏放數據,如果數據滿了就直接報錯。
q.get_nowait(): 不阻塞,直接從隊列拿數據,如果沒有數據就直接報錯。
方法一:
用 Queue (隊列)模塊編寫
from multiprocessing import Process, Queue def producer(q, production): for i in range(10): info = production + ‘打敗了第%s只 小怪獸‘ % (i + 1) q.put(info) q.put(None) # 當生產者生產完之後,放一個None def consumer(q, name): while 1: info生產者、消費者模型(Queue)put(None)放在子進程中= q.get() if info: print(‘%s%s‘ % (name, info)) else: # 當info 中沒有產品了,就會收到一個None,此時結束進程 break if __name__ == ‘__main__‘: q = Queue() p1 = Process(target=producer, args=(q, ‘奧德曼‘)) p2 = Process(target=consumer, args=(q, ‘迪迦‘)) p1.start() p2.start()
方法二:
用Queue(隊列)模塊編寫
from multiprocessing import Process, Queue def producer(q, production): for i in range(10): info = production + ‘打敗了第%s只 小怪獸‘ % (i + 1) q.put(info) def consumer(q, name, color): while 1: info = q.get() if info: print(‘%s%s%s\033[0m‘ % (color, name, info,)) else: break if __name__ == ‘__main__‘: q = Queue(5) p1 = Process(target=producer, args=(q, ‘奧特曼‘)) p2 = Process(target=producer, args=(q, ‘蝙蝠俠‘)) p3 = Process(target=producer, args=(q, ‘綠巨人‘)) p4 = Process(target=consumer, args=(q, ‘劉奶奶操縱‘, ‘\033[33m‘)) p5 = Process(target=consumer, args=(q, ‘李二毛騎著‘, ‘\033[36m‘)) p_lst = [p1, p2, p3, p4, p5] [i.start() for i in p_lst] p1.join() p2.join() p3.join() q.put(None) q.put(None)生產者、消費者(put(None))放在父進程
當有多個生產者和消費者時,有幾個消費者取,就放幾個put(None)
方法三:
用 JionableQueue (可加入隊列)編寫
JionableQueue是繼承Queue,同時多了兩個方法
q.jion(): 用於生產者。等待q.task_done的返回結果。生產者就能獲得消費者當前消費了多少數據。
q.task_done(): 用於消費者,消費者每拿一個數據都會給jion返回一個標識,標記數據。
from multiprocessing import Process, JoinableQueue def consumer(j, name, color): while 1: info = j.get() print(‘%s%s拿走了%s\033[0m‘ % (color, name, info)) j.task_done() def producer(j, name): for i in range(20): info = ‘第%s個%s‘ % (i + 1, name) j.put(info) j.join() if __name__ == ‘__main__‘: j = JoinableQueue(10) p1 = Process(target=consumer, args=(j, ‘猥瑣大叔‘, ‘\033[35m‘)) p2 = Process(target=producer, args=(j, ‘巴拉巴拉小魔仙‘,)) p1.daemon = True p1.start() p2.start() p2.join()生產者、消費者模型(JionableQueue)
2、管道
分為: 單進程管道和多進程管道
但進程管道收發:
con1 發送,只能con2接收。
con2發送, 只能con1接收
多進程管道收發:
父進程con1發送, 子進程只能con2接收
父進程con2發送, 子進程只能con1接收
父進程con1接收, 子進程只能con2發送
父進程con2接收, 子進程只能con1發送
from multiprocessing import Pipe, Process def fn(*args): con1, con2 = args con1.close() while 1: try: b = con2.recv() print(b) except EOFError: con2.close() break if __name__ == ‘__main__‘: con1, con2 = Pipe() p = Process(target=fn, args=(con1, con2)) p.start() con2.close() for i in range(10): con1.send(i) con1.close()多進程管道
3、進程間的數據共享
Manager
from multiprocessing import Manager
4、進程池
1)定義:一個池子裏(其實相當於一個存儲場所)有一定數量的進程。這些進城一直處於待命狀態,一旦有任務就去執行。
2)為什麽要有進程池?在實際業務中,任務量有多有少,如果任務特別多,不可能有正好開啟對應的任務。如果要開啟那麽多進程就會需要消耗大量的時間讓操作系統為你管理。其次還需要消耗大量的時間讓cpu調度。
因此,進程池就會幫程序員去管理池中的進程。
from multiprocessing import Pool
3)規則:進程池中所存放的進程數量最好是cpu核數 + 1
4)進程池中的三種方法:
(1)map (func, iterable)
func:進程池中的進程執行的任務函數
iterable:可叠代對象,把可叠代對象中的每個元素依次傳遞給任務函數。
(2)apply(func,agrs=())
同步的效率。進程回一個一個的執行任務
func:進程池中的進程執行的任務函數
args:可叠代對象型的參數,是傳給任務函數的參數
同步處理時,不需要close 和 join
同步處理時,進程池中的所有進程都是普通進程
(3)apply_async(func, args=()), cllback=None):
異步效率:進程池中的進程會同時去執行任務。
func: 進程池中的進程執行的函數
args:可叠代對象的參數,是傳給任務函數的參數
callback: 回調函數,就是每當進程池中的進程處理任務完成後,會返回結果給回調函數,you回調函數進一步處理,回調函數只有異步才有,同步沒有。
異步處理任務時:進程中的所有進程時守護進程。
異步處理任務是:必須加close 和 join
回調函數的使用:
進程的任務函數的返回值,被當成回調函數的形參被接收到,以此進行進一步操作處理
回調函數是由主進程調用的,而不是子進程,子進程只負責把結果返回給回調函數。
並發編程的那些事兒(四)