1. 程式人生 > >day32 訊號量 事件 管道 程序池

day32 訊號量 事件 管道 程序池

今日主要內容:

1.管道(Pipe) 資料接收一次就沒有了

2.事件(Event)

3.基於事件的程序通訊

4.訊號量(Semaphore)

5. 程序池(重點)

6.程序池的同步方法和非同步方法

7. 程序池的回撥函式,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回撥函式是在非同步程序中的函式


1.管道(Pipe) 資料接收一次就沒有了

Pipe() #建立管道,全雙工,返回管道的兩端,但是一端傳送訊息,只能是另一端才能接受到,自己這一端是接收不到的

from multiprocessing import Process,Pipe
def f1(conn):
from_zhujincheng=conn.recv()# 此處接收可以接收任何資料,而且不需要帶最大的傳輸的大小
print('我是子程序')
print('來自主程序的訊息>>>',from_zhujincheng)

if __name__=='__main__':
conn1,conn2=Pipe() # 建立管道,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另一端接收,自己這一端是不能接受的
# 可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了 但是這樣會導致資料的不安全,容易導致程式錯亂

p1=Process(target=f1,args=(conn1,) )
p1.start()
conn2.send('你好啊,大兄嘚')
print('我是主程序')

2.事件(Event)

Event() #建立事件物件,這個物件的初始狀態為False

from multiprocess import Process,Event

e=Event() # 建立事件物件,這個物件的初始狀態為False
print('判斷e的狀態>>>' , e.is_set()) #判斷e當前的狀態
print('程式執行到這裡了')
e.set() # 這裡的作用是將物件的狀態改為True
e.clear() # 將e的狀態改為False
e.wait() # 如果程式中e的狀態為False,那麼系統會阻塞在這裡,只有改為True的時候才會繼續往下執行
print('程序走過了wait.')

3.基於事件的程序通訊
import time
from multiprocessing import Process,Event

def func(e):
time.sleep(1)
n=100
print('子程序計算結果為',n)
e.set() # 將e的值修改為True

if __name__=='__main__':
e=Event() # 建立事件物件,初始狀態為False
p=Process(target=func,args=(e,))
p.start()

print('主程式在等待中......')
e.wait() # 等待e的值變為True
print('結果執行完畢,可以拿到這個值')

4.訊號量(Semaphore)

S = semphore(4),內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待
S.acquire()
需要鎖住的程式碼
S.release()

Semaphore(4) # 計數器4,acquire一次減一,如果semaphore的值為0,其他人等待,release加一 . 搶票啊啥的一般用這個,就是多個程式搶佔這4個cpu,如果4個都被佔用了,那麼他們剩餘的全部等待,只有當一個人執行完畢的時候,剩下的人才會繼續爭搶這個cpu.

import time
import random
from multiprocessing import Process,Semaphore
def func():
s.acquire() # 與程序鎖的用法類似
print(f'{i}號男嘉賓登場')
time.sleep(random.randint(1,3)) # 隨機
s.release() #與程序鎖的用法類似
if __name__=='__main__':
s=Semaphore(3) # 計數器3,每當acquire一次,計數器就減一,當release一次,計數器就加一. 如果計數器為0,那麼意味著所有的計數器都被佔用,其他的程式都要等待這3個程式,如果一個執行完畢後,剩下的才可以繼續爭搶這個名額.
for i in range(10): # 建立了10個程序,
p=Process(target=func,args=(i,))
p.start()

5. 程序池(重點)

程序的建立和銷燬是很有消耗的,影響程式碼執行效率
程序池:
Map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶close和join功能
Res = Apply(f1,args=(i,)) #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res

Res_obj = Apply_async(f1,args=(i,)) #非同步提交任務,可以直接拿到結果物件,從結果物件裡面拿結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待

Close : 鎖住程序池,防止有其他的新的任務在提交給程序池
Join : 等待著程序池將自己裡面的任務都執行完

回撥函式:
Apply_async(f1,args=(i,),callback=function) #將前面f1這個任務的返回結果作為引數傳給callback指定的那個function函式


程序池的map用法 (Pool)

# 對比多程序和程序池的效率,統計程序池和多程序執行100個任務的時間
import time
from multiprocessing import Process,Pool
def func(i):
time.sleep(0.5)
for a in range(10):
n=n+i

if __name__=='__main__':
# 程序池執行100個任務的時間
s_time=time.time()# 記錄開始的時間
p=Pool(4) # 裡面的引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,那麼預設開啟的程序數就是你電腦的cpu個數
p.map(func,range(100)) # map 2個引數第一個是你要執行的函式,第二個必須是可迭代的 ,非同步提交任務,自帶join功能
e_time=time.time()# 記錄結束的時間
res_time=e_time-s_time
print('程序池執行命令所用時間>>>',res_time)

#多程序執行100個任務的時間
p_s_t=time.time()
p_list=[]
for i in range(100):
p=Process(target=func,args=(i,))
p.start()
p_list.append(p)

[pp.join() for pp in p_list]
p_e_t=time.time()

res_t = p_e_t - p_s_t

print('多程序執行命令所用的時間>>>',res_t)

# 結果
程序池執行命令所用時間:0.40s
多程序執行命令所用時間:9.24s
所以說程序池的時間比多程序的時間快了近十倍,所以......你懂我的意思吧,兄嘚

6.程序池的同步方法和非同步方法
apply() 同步方法,將任務變成了序列
apply_async()非同步方法

# 同步方法
import time
from multiprocessing import Process,Pool
def func(n):
time.sleep(1)
# print(n)
return n*n

if __name__=='__main__':
po=Pool(4)
for i in range(10):
print('你好啊,大哥')
res=po.apply(func,args=(i,)) #程序池的同步方法,將任務變成了序列
print(res)


#非同步方法
import time
from multiprocessing import Process,Pool

def func():
pass

if __name__=='__main__':
po=Pool(4)
p_list=[]
for i in range(10):
print('你能每次都看到我嗎?')
res=po.apply_async(func,args=(i,)) #非同步給程序池提交任務
p_list.append(res)

po.close()#鎖住程序池,意思就是不讓其他的長鬚再往這個程序池裡面提交任務了
po.join() # 必須等待子程式執行完畢才可以執行主程式

#列印結果,如果非同步提交之後的結果物件
for i in p_list:
print(i.get())# 從物件中拿值需要用到get方法,get的效果是join的效果
# 主程式執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務
print('主程序直接結束')


7. 程序池的回撥函式,( pool.apply_async(f1,args=(1,),callback=f2) ) ,回撥函式是在非同步程序中的函式

所謂的回撥函式其實指的是,在主程序中第一個函式算出的值,被回撥函式把結果傳入到第二個函式中進行計算.

import os
from multiprocess import Process,Pool

def func(n):
s=n+1
return s**2

def func2(x):
print('回撥函式中的結果>>>',x)

if __name__=='__main__':
po=Pool(4)
po.apply_async(func,args=(3,),callback=func2)
po.close()
po.join()

print('主程序的id',os.getpid())