day 32 管道,訊號量,程序池,執行緒的建立
阿新 • • 發佈:2019-01-14
1.管道(瞭解)
Pipe():
在程序之間建立一條通道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道.
from multiprocessing import Process,Pipe
conn1,conn2 = Pipe() 結構
主要方法:
conn1.recv():接受conn2.send(obj)傳送的物件.如果沒有訊息可接受, recv方法會一直阻塞.如果連線的另一端已經關閉,那麼recv方法會跑輸EOEError.
conn1.send(obj
):通過連線傳送物件.obj是與序列化相容的任意物件
from multiprocessing import Process,Pipe
def f1(conn):
from_zhujincheng = conn.recv()
print('我是子程序')
print('來自主程序的訊息:',from_zhujincheng)
if __name__ == '__main__':
conn1,conn2 = Pipe() #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的
#可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了
p1 = Process(target=f1,args=(conn2,))
p1.start()
conn1.send('小寶貝,你在哪')
print('我是主程序')
2.訊號量 Semaphore
互斥鎖同時只允許一個執行緒更改熟路,而訊號量時同事允許一定數量的執行緒更改資料
s=Semaphore() 內部維護了一個計數器,acquire -1,release +1,為0的時候,其他的程序都要在acquire之前等待
s.acquire()
需要鎖住的程式碼
s.release()
import time
import random
from multiprocessing import Process,Semaphore
def f1(i,s):
s.acquire()
print('%s男嘉賓到了' %i)
time.sleep(random.randint(1,3))
s.release()
if __name__ == '__main__':
s =Semaphore(4)
for i in range(10):
p = Process(target=f1,args=(i,s))
p.start()
3.事件 Event
e = Event() 初識狀態是false
E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行
E.set() 將事件物件的狀態更改為true
E.is_set() 檢視狀態
E.clear() 將事件物件的狀態更改為false
from multiprocessing import Process,Event
e = Event() # 建立事件物件,這個物件初識狀態為False
print('e的狀態是:',e.is_set())
print('程序執行到這裡')
e.set() #將e的狀態更改為True
print('e的狀態是:',e.is_set())
e.clear() #將e的狀態更改為False
e.wait() #e這個事件物件如果值為False,就在我加wait的地方等待
print('程序過了wait')
基於事件的程序通訊:
import time
from multiprocessing import Process,Event
def f1(e):
time.sleep(2)
n = 100
print('子程序計算結果為',n)
e.set()
if __name__ == '__main__':
e = Event()
p = Process(target=f1,args=(e,))
p.start()
print('主程序等待....')
e.wait()
print('結果已經寫入檔案,可以拿到值')
4.程序池(重點) Pool 池子
對比:多程序和多程序池的效率對比:程序的建立和銷燬時很消耗的,影響程式碼執行效率
pool.map() 引數資料必須是可迭代的,非同步提交任務,自帶close和join功能
import time
from multiprocessing import Process,Pool
# def f1(n):
# time.sleep(1)
# print(n)
#對比多程序和程序池的效率
def f1(n):
for i in range(5):
n = n + i
if __name__ == '__main__':
#統計程序池執行100個任務的時間
s_time = time.time()
pool = Pool(4) #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數
# pool.map(f1,[1,2]) #引數資料必須是可迭代的
pool.map(f1,range(100)) #引數資料必須是可迭代的,非同步提交任務,自帶join功能
e_time = time.time()
dif_time = e_time - s_time
#統計100個程序,來執行100個任務的執行時間
p_s_t = time.time() #多程序起始時間
p_list = []
for i in range(100):
p = Process(target=f1,args=(i,))
p.start()
p_list.append(p)
# p.join()
[pp.join() for pp in p_list]
p_e_t = time.time()
p_dif_t = p_e_t - p_s_t
print('程序池的時間:',dif_time)
print('多程序的執行時間:',p_dif_t)
# 結果:
# 程序池的時間: 0.40102291107177734
# 多程序的執行時間: 9.247529029846191
程序池的同步和非同步的方法:
pool.apply() 同步方法,程序池的同步方法,將任務變成序列,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res
pool.apply_async()
非同步方法,,可以直接拿到結果物件,從結果物件裡面拿到結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待.
pool.close() 鎖住程序池,不在讓其他的持續往程式裡面人新任務,確保沒有新的任務交給程序池裡面的程序
join: 等待著程序池將自己裡面的任務都執行完
程序池的同步方法:
import time
from multiprocessing import Process,Pool
def f1(n):
time.sleep(1)
# print(n)
return n*n
if __name__ == '__main__':
pool = Pool(4)
for i in range(10):
print('xxxxxx')
res = pool.apply(f1,args=(i,))
print(res)
程序池的非同步方法:
import time
from multiprocessing import Process,Pool
def f1(n):
time.sleep(0.5)
# print(n)
return n*n
if __name__ == '__main__':
pool = Pool(4)
res_list = []
for i in range(10):
print('xxxx')
#非同步給程序池提交任務
res = pool.apply_async(f1,args=(i,))
res_list.append(res)
# print('等待所有任務執行完')
# pool.close() #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了
# pool.join()
#列印結果,如果非同步提交之後的結果物件
for i in res_list:
print(i.get())
程序池的同步和非同步的綜合:
import time
from multiprocessing import Process,Pool
def f1(n):
time.sleep(0.5)
# print(n)
return n*n
if __name__ == '__main__':
pool =Pool(4)
# pool.apply(f1,args=(2,)) #同步方法
res_list = []
for i in range(10):
# print('任務%s' %i)
#程序池的同步方法,將任務變成序列
# res = pool.apply(f1,args=(i,))
# print(res)
#程序池的非同步方法
res =pool.apply_async(f1,args=(i,))
print(res)
# as_result =res.get() #join的效果
# print(as_result)
res_list.append(res)
pool.close() # 鎖住程序池,不在讓其他的程式網裡面仍新的任務了,確保沒有新的任務交給程序池裡面的程序
pool.join()
for r in res_list:
print(r.get())
time.sleep(2)
#主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務
print('主程序直接結束')
p = Process(target=f1,)
5.程序池的回撥函式
Apply_async(f1,args=(i,),callback=function)
#
將前面
f1
這個任務的返回結果作為引數傳給
callback
指定的那個
function
函式
import os
from multiprocessing import Pool,Process
def f1(n):
print('程序池裡面的程序id',os.getpid())
print('>>>>',n)
return n*n
def call_back_func(asdf):
print('>>>>>>>>>>>>>',os.getpid())
print('回撥函式中的結果:',asdf)
# print('回撥函式中的結果:',s.get())
if __name__ == '__main__':
pool = Pool(4)
res = pool.apply_async(f1,args=(5,),callback=call_back_func)
pool.close()
pool.join()
# print(res.get())
print('主程序的程序id',os.getpid())
1.管道(瞭解) Pipe(): 在程序之間建立一條通道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道. from multiprocessing import Process,Pipe conn1,conn2 = Pipe() 結構 主要方法: conn1.recv():接受conn2.send(obj)傳送的物件.如果沒有訊息可接受, recv方法會一直阻塞.如果連線的另一端已經關閉,那麼recv方法會跑輸EOEError. conn1.send(obj ):通過連線傳送物件.obj是與序列化相容的任意物件 from multiprocessing import Process,Pipe def f1(conn): from_zhujincheng = conn.recv() print('我是子程序') print('來自主程序的訊息:',from_zhujincheng) if __name__ == '__main__': conn1,conn2 = Pipe() #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的 #可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了 p1 = Process(target=f1,args=(conn2,)) p1.start() conn1.send('小寶貝,你在哪') print('我是主程序') 2.訊號量 Semaphore 互斥鎖同時只允許一個執行緒更改熟路,而訊號量時同事允許一定數量的執行緒更改資料 s=Semaphore() 內部維護了一個計數器,acquire -1,release +1,為0的時候,其他的程序都要在acquire之前等待 s.acquire() 需要鎖住的程式碼 s.release() import time import random from multiprocessing import Process,Semaphore def f1(i,s): s.acquire() print('%s男嘉賓到了' %i) time.sleep(random.randint(1,3)) s.release() if __name__ == '__main__': s =Semaphore(4) for i in range(10): p = Process(target=f1,args=(i,s)) p.start() 3.事件 Event e = Event() 初識狀態是false E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行 E.set() 將事件物件的狀態更改為true E.is_set() 檢視狀態 E.clear() 將事件物件的狀態更改為false from multiprocessing import Process,Event e = Event() # 建立事件物件,這個物件初識狀態為False print('e的狀態是:',e.is_set()) print('程序執行到這裡') e.set() #將e的狀態更改為True print('e的狀態是:',e.is_set()) e.clear() #將e的狀態更改為False e.wait() #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait') 基於事件的程序通訊: import time from multiprocessing import Process,Event def f1(e): time.sleep(2) n = 100 print('子程序計算結果為',n) e.set() if __name__ == '__main__': e = Event() p = Process(target=f1,args=(e,)) p.start() print('主程序等待....') e.wait() print('結果已經寫入檔案,可以拿到值') 4.程序池(重點) Pool 池子 對比:多程序和多程序池的效率對比:程序的建立和銷燬時很消耗的,影響程式碼執行效率 pool.map() 引數資料必須是可迭代的,非同步提交任務,自帶close和join功能 import time from multiprocessing import Process,Pool # def f1(n): # time.sleep(1) # print(n) #對比多程序和程序池的效率 def f1(n): for i in range(5): n = n + i if __name__ == '__main__': #統計程序池執行100個任務的時間 s_time = time.time() pool = Pool(4) #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數 # pool.map(f1,[1,2]) #引數資料必須是可迭代的 pool.map(f1,range(100)) #引數資料必須是可迭代的,非同步提交任務,自帶join功能 e_time = time.time() dif_time = e_time - s_time #統計100個程序,來執行100個任務的執行時間 p_s_t = time.time() #多程序起始時間 p_list = [] for i in range(100): p = Process(target=f1,args=(i,)) p.start() p_list.append(p) # p.join() [pp.join() for pp in p_list] p_e_t = time.time() p_dif_t = p_e_t - p_s_t print('程序池的時間:',dif_time) print('多程序的執行時間:',p_dif_t) # 結果: # 程序池的時間: 0.40102291107177734 # 多程序的執行時間: 9.247529029846191 程序池的同步和非同步的方法: pool.apply() 同步方法,程序池的同步方法,將任務變成序列,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res pool.apply_async() 非同步方法,,可以直接拿到結果物件,從結果物件裡面拿到結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待. pool.close() 鎖住程序池,不在讓其他的持續往程式裡面人新任務,確保沒有新的任務交給程序池裡面的程序 join: 等待著程序池將自己裡面的任務都執行完 程序池的同步方法: import time from multiprocessing import Process,Pool def f1(n): time.sleep(1) # print(n) return n*n if __name__ == '__main__': pool = Pool(4) for i in range(10): print('xxxxxx') res = pool.apply(f1,args=(i,)) print(res) 程序池的非同步方法: import time from multiprocessing import Process,Pool def f1(n): time.sleep(0.5) # print(n) return n*n if __name__ == '__main__': pool = Pool(4) res_list = [] for i in range(10): print('xxxx') #非同步給程序池提交任務 res = pool.apply_async(f1,args=(i,)) res_list.append(res) # print('等待所有任務執行完') # pool.close() #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了 # pool.join() #列印結果,如果非同步提交之後的結果物件 for i in res_list: print(i.get()) 程序池的同步和非同步的綜合: import time from multiprocessing import Process,Pool def f1(n): time.sleep(0.5) # print(n) return n*n if __name__ == '__main__': pool =Pool(4) # pool.apply(f1,args=(2,)) #同步方法 res_list = [] for i in range(10): # print('任務%s' %i) #程序池的同步方法,將任務變成序列 # res = pool.apply(f1,args=(i,)) # print(res) #程序池的非同步方法 res =pool.apply_async(f1,args=(i,)) print(res) # as_result =res.get() #join的效果 # print(as_result) res_list.append(res) pool.close() # 鎖住程序池,不在讓其他的程式網裡面仍新的任務了,確保沒有新的任務交給程序池裡面的程序 pool.join() for r in res_list: print(r.get()) time.sleep(2) #主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務 print('主程序直接結束') p = Process(target=f1,) 5.程序池的回撥函式 Apply_async(f1,args=(i,),callback=function) # 將前面 f1 這個任務的返回結果作為引數傳給 callback 指定的那個 function 函式 import os from multiprocessing import Pool,Process def f1(n): print('程序池裡面的程序id',os.getpid()) print('>>>>',n) return n*n def call_back_func(asdf): print('>>>>>>>>>>>>>',os.getpid()) print('回撥函式中的結果:',asdf) # print('回撥函式中的結果:',s.get()) if __name__ == '__main__': pool = Pool(4) res = pool.apply_async(f1,args=(5,),callback=call_back_func) pool.close() pool.join() # print(res.get()) print('主程序的程序id',os.getpid())