並發通信、生產者消費者模型
阿新 • • 發佈:2018-09-21
技術分享 strong 來講 ict ces nbsp 多進程之間通信 lease 聲明
多進程之間通信的限制
看一個例子:
import multiprocessing as mp data=666 def func(): global data data=222 p=mp.Process(target=func) p.start() p.join() print(data) >>>666
可以看到,聲明為global的data也沒有發生變化,輸出結果仍然是666,這正是多進程之間通信的限制,各個進程之間是相互獨立的,互不幹擾的內存空間。因此如果想要空想數據就必須開辟一段共享的內存空間。就要用到Manger對象。
Manger對象
我們常用的Manger對象空間有list(),dict(),Queue()三種,下面舉一個List()的簡單例子。
from multiprocessing import Process,Manager mgr=Manager() #創建服務器進程並返回通信的管理器 list_proxy=mgr.list() #通過管理器在列表中開辟空間,並返回一個代理 print(list_proxy) def func(list_ex): list_ex.append(‘a‘)
#把代理傳給子進程子進程就可以通過代理來訪問共享的內存空間了。 p=Process(target=func,args=(list_proxy,)) p.start() p.join() print(list_proxy)>>>[] [‘a‘]
線程間的共享與同步鎖
進程間如果不通過Manger對象是無法進行內存共享的,那麽對於線程呢?對於Python來講每一次只能執行一個線程,由於GIL鎖的存在。我們來看例子。
import threading data=666 def func(): global data data=222 t=threading.Thread(target=func) t.start() t.join() print(data) >>>222
我們看到結果輸出了222,也就是說全局對象更改了data的值,由此可見線程之間的內存是共享的。正是因為共享的便會出現資源競爭的問題,我們來看例子:
import threading data=0 n=10000000 #這個n必須足夠大才能看出效果 def add(n): global data for i in range(n): data+=1 def sub(n): global data for i in range(n): data-=1 a=threading.Thread(target=add,args=(n,)) s=threading.Thread(target=sub,args=(n,)) a.start() s.start() a.join() s.join() print(data) >>>-1561473
可以看到本來應該為0的值,在基數足夠大的時候就出現了問題,這就是由於線程之間的內存共享導致的,所以為了解決這一個問題就出現了同步鎖的概念,說白了就是加上鎖,然後控制資源的訪問權限這樣就會避免資源競爭的出現。看代碼。
import threading lock=threading.Lock() #生成一把鎖 data=0 n=10000000 def add(n): global data for i in range(n): lock.acquire() data+=1 lock.release() def sub(n): global data for i in range(n): lock.acquire() data-=1 lock.release() a=threading.Thread(target=add,args=(n,)) s=threading.Thread(target=sub,args=(n,)) a.start() s.start() a.join() s.join() print(data) >>>0
這樣通過鎖來訪問就正確的得出結果了,但是要記住一點加鎖之後要記得釋放,或者通過with語法這樣會自動幫你釋放。
with lock:
data-=1
線程與進程安全的隊列
隊列是一種常用的數據結構,原則是先進先出(FIFO)。
線程安全的隊列
主要方法包括:
- 入隊:put(item)
- 出隊:get()
- 測試空:empty() #近似
- 測試滿:full() #近似
- 隊列長度:qsize() #近似
- 任務結束:task_done()
- 等待完成:join()
進程安全隊列
進程的隊列要用到之前提到的Manger對象,mgr.Queue()
主要方法包括:
- 入隊:put(item)
- 出隊:get()
- 測試空:empty() #近似
- 測試滿:full() #近似
- 隊列長度:qsize() #近似
例子我們放到下面的生產者消費者模型中講解。
生產者消費者模型
何所謂生產者消費者模型?
就是說我們把進程之間的通信分開考慮,生產者只要往隊列裏面丟東西,消費者只要從隊列裏取東西,而二者不用考慮對方。
多線程實現
#生產者消費者模型 import queue import threading import random import time class Producer(threading.Thread): def __init__(self, queue): super().__init__() self.queue = queue def run(self): while True: #生成了一個數據 data = random.randint(0, 99) self.queue.put(data) #把數據丟進隊列中 print(‘生產者: 生產了:‘, data) time.sleep(1) class Concumer(threading.Thread): def __init__(self, queue): super().__init__() self.queue = queue def run(self): while True: item = self.queue.get() #從隊列中拿一個數據 print(‘消費者: 從隊列中拿到:‘, item) q = queue.Queue(5) #創建一個隊列 producer = Producer(q) #創建一個生產者 concumer = Concumer(q) #創建一個消費者 producer.start() concumer.start() >>>生產者: 生產了: 46 消費者: 從隊列中拿到: 46 生產者: 生產了: 9 消費者: 從隊列中拿到: 9 生產者: 生產了: 39 消費者: 從隊列中拿到: 39 生產者: 生產了: 89 消費者: 從隊列中拿到: 89
多進程實現
import multiprocessing import random import time class Producer(multiprocessing.Process): def __init__(self,queue): super().__init__() self.queue=queue def run(self): while True: data=random.randint(0,100) self.queue.put(data) print("生產者生產了數據{}".format(data)) time.sleep(1) class Consumer(multiprocessing.Process): def __init__(self,queue): super().__init__() self.queue=queue def run(self): while True: item=self.queue.get() print("消費者消費{}".format(item)) if __name__ == ‘__main__‘: manger = multiprocessing.Manager() queue_m = manger.Queue() producer=Producer(queue_m) consumer=Consumer(queue_m) producer.start() consumer.start() producer.join() consumer.join() >>>生產者生產了數據20 消費者消費20 生產者生產了數據62 消費者消費62 生產者生產了數據26 消費者消費26 生產者生產了數據36 消費者消費36 生產者生產了數據56 消費者消費56
並發通信、生產者消費者模型