1. 程式人生 > >生產者消費者模型 線程池

生產者消費者模型 線程池

進一步 子進程 lba RoCE 就會 生產者消費者 bsp nbsp 獲取數據

1.生產者消費者模型

  主要是為解耦

  借助隊列來實現生產者消費這模型

  棧:先進後出(First In Last Out 簡稱:FILO)

  隊列:先進先出(FIFO)

import queue

  from multiprocessing import Queue 借助Queue解決生產者消費這模型隊列是安全的

  q=Queue(m)

  

q = Queue(num)
num : 隊列的最大長度
q.get()# 阻塞等待獲取數據,如果有數據直接獲取,如果沒有數據,阻塞等待
q.put()# 阻塞,如果可以繼續往隊列中放數據,就直接放,不能放就阻塞等待

q.get_nowait()# 不阻塞,如果有數據直接獲取,沒有數據就報錯
q.put_nowait()# 不阻塞,如果可以繼續往隊列中放數據,就直接放,不能放就報錯

(2)from multiprocessing import JoinableQueue#可連接的隊列
JoinableQueue是繼承Queue,所以可以使用Queue中的方法
並且JoinableQueue又多了兩個方法
q.join()# 用於生產者。等待 q.task_done的返回結果,通過返回結果,生產者就能獲得消費者當前消費了多少個數據
q.task_done() # 用於消費者,是指每消費隊列中一個數據,就給join返回一個標識。

2.管道

from multiprocessing import Pipe
con1,con2 = Pipe()
管道是不安全的。
管道是用於多進程之間通信的一種方式。
如果在單進程中使用管道,那麽就是con1收數據,就是con2發數據。
如果是con1發數據,就是con2收數據

如果在多進程中使用管道,那麽就必須是父進程使用con1收,子進程就必須使用con2發
父進程使用con1發,子進程就必須使用con2收
父進程使用con2收,子進程就必須使用con1發
父進程使用con2發,子進程就必須使用con1收
在管道中有一個著名的錯誤叫做EOFError。是指,父進程中如果關閉了發送端,子進程還繼續接收數據,那麽就會引發EOFError。

3 進程之間的共享內存
from multiprocessing import Manager,Value
m = Manager()
num = m.dict({鍵 : 值})
num = m.list([1,2,3])

4 進程池
進程池:一個池子,裏邊有固定數量的進程。這些進程一直處於待命狀態,一旦有任務來,馬上就有進程去處理。
因為在實際業務中,任務量是有多有少的,如果任務量特別的多,不可能要開對應那麽多的進程數
開啟那麽多進程首先就需要消耗大量的時間讓操作系統來為你管理它。其次還需要消耗大量時間讓
cpu幫你調度它。
進程池還會幫程序員去管理池中的進程。
from multiprocessing import Pool
p = Pool(os.cpu_count() + 1)

進程池有三個方法:
map(func,iterable)
func:進程池中的進程執行的任務函數
iterable: 可叠代對象,是把可叠代對象中的每個元素依次傳給任務函數當參數

apply(func,args=()): 同步的效率,也就是說池中的進程一個一個的去執行任務
func:進程池中的進程執行的任務函數
args: 可叠代對象型的參數,是傳給任務函數的參數
同步處理任務時,不需要close和join
同步處理任務時,進程池中的所有進程是普通進程(主進程需要等待其執行結束)

apply_async(func,args=(),callback=None): 異步的效率,也就是說池中的進程一次性都去執行任務
func:進程池中的進程執行的任務函數
args: 可叠代對象型的參數,是傳給任務函數的參數
callback: 回調函數,就是說每當進程池中有進程處理完任務了,返回的結果可以交給回調函數,由回調函數進行進一步的處理,回調函數只有異步才有,同步是沒有的
異步處理任務時,進程池中的所有進程是守護進程(主進程代碼執行完畢守護進程就結束)
異步處理任務時,必須要加上close和join

回調函數的使用:
進程的任務函數的返回值,被當成回調函數的形參接收到,以此進行進一步的處理操作
回調函數是由主進程調用的,而不是子進程,子進程只負責把結果傳遞給回調函數

生產者消費者模型 線程池