day40 Pyhton 併發程式設計03
一.內容回顧
程序是計算機中最小的資源分配單位
程序與程序之間資料隔離,執行過程非同步
為什麼會出現程序的概念?
為了合理利用cpu,提高使用者體驗
多個程序是可以同時利用多個cpu的,可以實現並行的效果
殭屍程序
程序 狀態碼 z/z 殭屍程序 linux
在主程序中控制子程序的方法
子程序物件 = Process (target,args) 在建立的這一刻根本就沒有通知作業系統
子程序物件. start() 通知作業系統,開啟子程序,非同步非阻塞
子程序物件.terminate() 通知作業系統,結束子程序,非同步非阻塞
子程序物件.is_alive() 檢視子程序是否還活著
子程序物件.join() 阻塞,直到子程序結束
子程序物件.join(timeout = 10) 阻塞最多10s,期間子程序如果結束就結束阻塞,如果沒結束10s之後也結束阻塞
# 守護程序 # 守護程序是一個子程序 # 守護程序會在主程序程式碼結束之後才結束 # 為什麼會這樣? # 由於主程序必須要回收所有的子程序的資源 # 所以主程序必須在子程序結束之後才能結束 # 而守護程序就是為了守護主程序存在的# 不能守護到主程序結束,就只能退而求其次,守護到程式碼結束了 # 守護到主程序的程式碼結束,意味著如果有其他子程序沒有結束,守護程序無法繼續守護 # 解決方案 : 在主程序中加入對其他子程序的join操作,來保證守護程序可以守護所有主程序和子程序的執行 # 如何設定守護程序 # 子程序物件.daemon = True 這句話寫在start之前
# 鎖 # 為什麼要用鎖? # 由於多個程序的併發,導致很多資料的操作都在同時進行 # 所以就有可能產生多個程序同時操作 : 檔案\資料庫 中的資料 #導致資料不安全 # 所以給某一段修改資料的程式加上鎖,就可以控制這段程式碼永遠不會被多個程序同時執行 # 保證了資料的安全 # Lock 鎖(互斥鎖) # 鎖實際上是把你的某一段程式變成同步的了,降低了程式執行的速度,為了保證資料的安全性 # 沒有資料安全的效率都是耍流氓
訊號量
# 對於鎖 保證一段程式碼同一時刻只能有一個程序執行 # 對於訊號量 保證一段程式碼同一時刻只能有n個程序執行 # 流量控制 # 10個程序
from multiprocessing import Semaphore sem = Semaphore(4) sem.acquire() print('拿走一把鑰匙1') sem.acquire() print('拿走一把鑰匙2') sem.acquire() print('拿走一把鑰匙3') sem.acquire() print('拿走一把鑰匙4') sem.release()#釋放訊號
sem.acquire() print('拿走一把鑰匙5')
訊號量Semaphore是同時允許一定數量的執行緒更改資料
訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。
import time import random from multiprocessing import Process,Semaphore def ktv(name,sem): sem.acquire() print("%s走進了ktv"%name) time.sleep(random.randint(5,10)) print("%s走出了ktv" % name) sem.release() if __name__ == '__main__': sem = Semaphore(4) for i in range(100): p = Process(target=ktv,args = ('name%s'%i,sem)) p.start() name1走進了ktv name0走進了ktv name3走進了ktv name4走進了ktv name1走出了ktv name2走進了ktv name3走出了ktv
事件Event 事件類
e = Event()
e為事件物件,事件本身就帶著標識:False
wait 阻塞
它的阻塞條件是 物件標識為False
結束阻塞條件是 物件標識為True
物件的標識相關的
set 將物件的標識設定為True
clear 將物件的標識設定為False
is_set 檢視物件的標識是否為True
import time
import random
from multiprocessing import Event,Process def traffic_light(e): print('\033[1;31m紅燈亮\033[0m') while True: time.sleep(2) if e.is_set(): # 如果當前是綠燈 print('\033[1;31m紅燈亮\033[0m') # 先列印紅燈亮 e.clear() # 再把燈改成紅色 else : # 當前是紅燈 print('\033[1;32m綠燈亮\033[0m') # 先列印綠燈亮 e.set() # 再把燈變綠色 def car(e,carname): if not e.is_set(): print('%s正在等待通過'%carname) e.wait() print('%s正在通過'%carname) if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args = (e,)) p.start() for i in range(100): time.sleep(random.randrange(0,3)) p = Process(target=car, args=(e,'car%s'%i)) p.start()
標識 控制wait是否阻塞的關鍵
如何修改這個標識 : clear set
如何檢視這個標識 : is_set
程序之間的資料通訊 IPC
管道 Pipe
佇列 Queue
from multiprocessing import Queue,Process def consumer(q): print( '子程序 :', q.get() ) if __name__ == '__main__': q = Queue() p = Process(target=consumer,args=(q,)) p.start() q.put('hello,world')
# 生產者消費者模型 import time from multiprocessing import Queue,Process def producer(name,food,num,q): '''生產者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生產了%s'%(name,foodi)) q.put(foodi) def consumer(name,q): while True: food = q.get() # 等待接收資料 if food == None:break print('%s吃了%s'%(name,food)) time.sleep(1) if __name__ == '__main__': q = Queue(maxsize=10) p1 = Process(target=producer,args = ('寶元','泔水',20,q)) p2 = Process(target=producer,args = ('戰山','魚刺',10,q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) p1.start() # 開始生產 p2.start() # 開始生產 c1.start() c2.start() p1.join() # 生產者結束生產了 p2.join() # 生產者結束生產了 q.put(None) # put None 操作永遠放在所有的生產者結束生產之後 q.put(None) # 有幾個消費者 就put多少個None
佇列為空不準確
q.qsize() 佇列的大小 #
q.full() 是否滿了 滿返回True
q.empty() 是否空了 空返回True
import time from multiprocessing import JoinableQueue,Process def consumer(name,q): while True: food = q.get() time.sleep(1) print('%s消費了%s'%(name,food)) q.task_done() def producer(name,food,num,q): '''生產者''' for i in range(num): time.sleep(0.3) foodi = food + str(i) print('%s生產了%s'%(name,foodi)) q.put(foodi) q.join() # 消費者消費完畢之後會結束阻塞 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=('寶元', '泔水', 20, q)) c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('wusir', q)) c1.daemon = True c2.daemon = True p1.start() c1.start() c2.start() p1.join()
消費者每消費一個數據會給佇列傳送一條資訊
當每一個數據都被消費掉之後 joinablequeue的join阻塞行為就會結束
以上就是為什麼我們要在生產完所有資料的時候發起一個q.join()
隨著生產者子程序的執行完畢,說明消費者的資料都消費完畢了
這個時候主程序中的p1.join結束
主程序的程式碼結束
守護程序也結束了
程序之間的資料共享
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂#{'count': 5} d['count']-=1 # lock.acquire()#與上面意思一樣 # d['count'] -= 1 # lock.release() if __name__ == '__main__': lock=Lock() m = Manager() dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
# Manager是一個類 內部有一些資料型別能夠實現程序之間的資料共享 # dict list這樣的資料 內部的數字進行自加 自減 是會引起資料不安全的,這種情況下 需要我們手動加鎖完成 # 因此 我們一般情況下 不適用這種方式來進行程序之間的通訊 # 我們寧可使用Queue佇列或者其他訊息中介軟體 來實現訊息的傳遞 保證資料的安全
程序池
multiprocess.Pool模組
為什麼要有程序池?程序池的概念
在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序
程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。
import os import time from multiprocessing import Pool # 池 def func(i): i -= 1 return i**2 # 你的池中打算放多少個程序,個數cpu的個數 * 1|2 if __name__ == '__main__': p = Pool(5) ret = p.map(func,range(100)) # 自動帶join print(ret)
有了程序池,不僅可以只開有限的程序來完成無限的任務
還可以獲取程式執行的返回值
如果沒有池幫助你實現功能,那麼你自己能不能實現???
通過佇列
# 同步方式向程序池提交任務 import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.5) return i**2 # 你的池中打算放多少個程序,個數cpu的個數 * 1|2 if __name__ == '__main__': p = Pool(5) for i in range(100): ret = p.apply(func,args=(i,)) # 自動帶join 序列 同步 apply就是同步提交任務 print(ret)
# 非同步方式向程序池提交任務 import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(0.1) print(i) return i**2 # 你的池中打算放多少個程序,個數cpu的個數 * 1|2 if __name__ == '__main__': p = Pool(5) for i in range(100): ret = p.apply_async(func,args=(i,)) # 自動帶join 非同步的 apply_async非同步提交任務 print(ret) p.close() # 關閉程序池的任務提交 從此之後不能再向p這個池提交新的任務 p.join() # 阻塞 一直到所有的任務都執行完
# 非同步方式向程序池提交任務並且獲取返回值 import time from multiprocessing import Pool # 池 def func(i): i -= 1 time.sleep(1) return i**2 # 你的池中打算放多少個程序,個數cpu的個數 * 1|2 if __name__ == '__main__': p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,)) # 自動帶join 非同步的 apply_async非同步提交任務 l.append(ret) for ret in l: print(ret.get())
# 為什麼要用程序池? # 任務很多 cpu個數*5個任務以上 # 為了節省建立和銷燬程序的時間 和 作業系統的資源 # 一般程序池中程序的個數: # cpu的1-2倍 # 如果是高計算,完全沒有io,那麼就用cpu的個數 # 隨著IO操作越多,可能池中的程序個數也可以相應增加 # 向程序池中提交任務的三種方式 # map 非同步提交任務 簡便演算法 接收的引數必須是 子程序要執行的func,可迭代的(可迭代中的每一項都會作為引數被傳遞給子程序) # 能夠傳遞的引數是有限的,所以比起apply_async限制性比較強 # apply 同步提交任務(你刪了吧) # apply_async 非同步提交任務 # 能夠傳遞比map更豐富的引數,但是比較麻煩 # 首先 apply_async提交的任務和主程序完全非同步 # 可以通過先close程序池,再join程序池的方式,強制主程序等待程序池中任務的完成 # 也可以通過get獲取返回值的方式,來等待任務的返回值 # 我們不能在apply_async提交任務之後直接get獲取返回值 # for i in range(100): # ret = p.apply_async(func,args=(i,)) # 自動帶join 非同步的 apply_async非同步提交任務 # l.append(ret) # for ret in l: # print(ret.get())
# 回撥函式 import os import time import random from multiprocessing import Pool # 池 def func(i): # [2,1,1,5,0,0.2] i -= 1 time.sleep(random.uniform(0,2)) return i**2 def back_func(args): print(args,os.getpid()) if __name__ == '__main__': print(os.getpid()) p = Pool(5) l = [] for i in range(100): ret = p.apply_async(func,args=(i,),callback=back_func) # 5個任務 p.close() p.join() # callback回撥函式 # 主動執行func,然後在func執行完畢之後的返回值,直接傳遞給back_func作為引數,呼叫back_func # 處理池中任務的返回值 # 回撥函式是由誰執行的? 主程序