1. 程式人生 > >day 32 管道,訊號量,程序池,執行緒的建立

day 32 管道,訊號量,程序池,執行緒的建立

1.管道(瞭解) Pipe():  在程序之間建立一條通道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道. from multiprocessing import Process,Pipe conn1,conn2 = Pipe()  結構 主要方法: conn1.recv():接受conn2.send(obj)傳送的物件.如果沒有訊息可接受, recv方法會一直阻塞.如果連線的另一端已經關閉,那麼recv方法會跑輸EOEError.
conn1.send(obj ):通過連線傳送物件.obj是與序列化相容的任意物件 from multiprocessing import Process,Pipe     def f1(conn):       from_zhujincheng = conn.recv()       print('我是子程序')     print('來自主程序的訊息:',from_zhujincheng)     if __name__ == '__main__':     conn1,conn2 = Pipe()  #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的       #可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了     p1 = Process(target=f1,args=(conn2,))     p1.start()       conn1.send('小寶貝,你在哪')       print('我是主程序') 2.訊號量  Semaphore
互斥鎖同時只允許一個執行緒更改熟路,而訊號量時同事允許一定數量的執行緒更改資料 s=Semaphore() 內部維護了一個計數器,acquire -1,release +1,為0的時候,其他的程序都要在acquire之前等待 s.acquire() 需要鎖住的程式碼 s.release() import time import random from multiprocessing import Process,Semaphore   def f1(i,s):     s.acquire()
    print('%s男嘉賓到了' %i)     time.sleep(random.randint(1,3))     s.release()   if __name__ == '__main__':     s =Semaphore(4)       for i in range(10):         p = Process(target=f1,args=(i,s))           p.start() 3.事件 Event e = Event()  初識狀態是false E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行 E.set()  將事件物件的狀態更改為true E.is_set() 檢視狀態 E.clear() 將事件物件的狀態更改為false from multiprocessing import Process,Event     e = Event() # 建立事件物件,這個物件初識狀態為False print('e的狀態是:',e.is_set())   print('程序執行到這裡') e.set() #將e的狀態更改為True print('e的狀態是:',e.is_set())   e.clear() #將e的狀態更改為False e.wait()  #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait')  基於事件的程序通訊: import time from multiprocessing import  Process,Event   def f1(e):     time.sleep(2)     n = 100     print('子程序計算結果為',n)     e.set()   if __name__ == '__main__':     e = Event()     p = Process(target=f1,args=(e,))     p.start()       print('主程序等待....')     e.wait()     print('結果已經寫入檔案,可以拿到值') 4.程序池(重點) Pool 池子 對比:多程序和多程序池的效率對比:程序的建立和銷燬時很消耗的,影響程式碼執行效率 pool.map() 引數資料必須是可迭代的,非同步提交任務,自帶close和join功能 import time from multiprocessing import Process,Pool     # def f1(n): #     time.sleep(1) #     print(n)   #對比多程序和程序池的效率 def f1(n):     for i in range(5):         n = n + i if __name__ == '__main__':     #統計程序池執行100個任務的時間     s_time = time.time()     pool = Pool(4)  #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數     # pool.map(f1,[1,2])  #引數資料必須是可迭代的     pool.map(f1,range(100))  #引數資料必須是可迭代的,非同步提交任務,自帶join功能     e_time = time.time()     dif_time = e_time - s_time       #統計100個程序,來執行100個任務的執行時間     p_s_t = time.time() #多程序起始時間     p_list = []     for i in range(100):         p = Process(target=f1,args=(i,))         p.start()         p_list.append(p)         # p.join()     [pp.join() for pp in p_list]     p_e_t = time.time()     p_dif_t = p_e_t - p_s_t     print('程序池的時間:',dif_time)     print('多程序的執行時間:',p_dif_t)     # 結果:     # 程序池的時間: 0.40102291107177734     # 多程序的執行時間: 9.247529029846191 程序池的同步和非同步的方法: pool.apply()  同步方法,程序池的同步方法,將任務變成序列,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res pool.apply_async()  非同步方法,,可以直接拿到結果物件,從結果物件裡面拿到結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待. pool.close()  鎖住程序池,不在讓其他的持續往程式裡面人新任務,確保沒有新的任務交給程序池裡面的程序 join: 等待著程序池將自己裡面的任務都執行完 程序池的同步方法: import time from multiprocessing import Process,Pool     def f1(n):     time.sleep(1)     # print(n)     return n*n   if __name__ == '__main__':       pool = Pool(4)       for i in range(10):         print('xxxxxx')         res = pool.apply(f1,args=(i,))         print(res) 程序池的非同步方法:   import time from multiprocessing import Process,Pool     def f1(n):     time.sleep(0.5)     # print(n)     return n*n   if __name__ == '__main__':       pool = Pool(4)       res_list = []     for i in range(10):         print('xxxx')         #非同步給程序池提交任務         res = pool.apply_async(f1,args=(i,))         res_list.append(res)       # print('等待所有任務執行完')     # pool.close()  #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了     # pool.join()       #列印結果,如果非同步提交之後的結果物件     for i in res_list:         print(i.get()) 程序池的同步和非同步的綜合: import time from multiprocessing import Process,Pool   def f1(n):     time.sleep(0.5)     # print(n)     return  n*n   if __name__ == '__main__':     pool =Pool(4)     # pool.apply(f1,args=(2,))  #同步方法     res_list = []     for i in range(10):         # print('任務%s' %i)         #程序池的同步方法,將任務變成序列         # res = pool.apply(f1,args=(i,))         # print(res)         #程序池的非同步方法         res =pool.apply_async(f1,args=(i,))         print(res)         # as_result =res.get()  #join的效果         # print(as_result)         res_list.append(res)       pool.close() # 鎖住程序池,不在讓其他的程式網裡面仍新的任務了,確保沒有新的任務交給程序池裡面的程序     pool.join()       for r in res_list:         print(r.get())       time.sleep(2)     #主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務     print('主程序直接結束')     p = Process(target=f1,) 5.程序池的回撥函式   Apply_async(f1,args=(i,),callback=function)   # 將前面 f1 這個任務的返回結果作為引數傳給 callback 指定的那個 function 函式 import os from multiprocessing import Pool,Process   def f1(n):     print('程序池裡面的程序id',os.getpid())     print('>>>>',n)     return n*n   def call_back_func(asdf):     print('>>>>>>>>>>>>>',os.getpid())     print('回撥函式中的結果:',asdf)     # print('回撥函式中的結果:',s.get())   if __name__ == '__main__':     pool = Pool(4)     res = pool.apply_async(f1,args=(5,),callback=call_back_func)     pool.close()     pool.join()     # print(res.get())     print('主程序的程序id',os.getpid())

 

  1.管道(瞭解) Pipe():  在程序之間建立一條通道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道. from multiprocessing import Process,Pipe conn1,conn2 = Pipe()  結構 主要方法: conn1.recv():接受conn2.send(obj)傳送的物件.如果沒有訊息可接受, recv方法會一直阻塞.如果連線的另一端已經關閉,那麼recv方法會跑輸EOEError. conn1.send(obj ):通過連線傳送物件.obj是與序列化相容的任意物件 from multiprocessing import Process,Pipe     def f1(conn):       from_zhujincheng = conn.recv()       print('我是子程序')     print('來自主程序的訊息:',from_zhujincheng)     if __name__ == '__main__':     conn1,conn2 = Pipe()  #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的       #可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了     p1 = Process(target=f1,args=(conn2,))     p1.start()       conn1.send('小寶貝,你在哪')       print('我是主程序') 2.訊號量  Semaphore 互斥鎖同時只允許一個執行緒更改熟路,而訊號量時同事允許一定數量的執行緒更改資料 s=Semaphore() 內部維護了一個計數器,acquire -1,release +1,為0的時候,其他的程序都要在acquire之前等待 s.acquire() 需要鎖住的程式碼 s.release() import time import random from multiprocessing import Process,Semaphore   def f1(i,s):     s.acquire()     print('%s男嘉賓到了' %i)     time.sleep(random.randint(1,3))     s.release()   if __name__ == '__main__':     s =Semaphore(4)       for i in range(10):         p = Process(target=f1,args=(i,s))           p.start() 3.事件 Event e = Event()  初識狀態是false E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行 E.set()  將事件物件的狀態更改為true E.is_set() 檢視狀態 E.clear() 將事件物件的狀態更改為false from multiprocessing import Process,Event     e = Event() # 建立事件物件,這個物件初識狀態為False print('e的狀態是:',e.is_set())   print('程序執行到這裡') e.set() #將e的狀態更改為True print('e的狀態是:',e.is_set())   e.clear() #將e的狀態更改為False e.wait()  #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait')  基於事件的程序通訊: import time from multiprocessing import  Process,Event   def f1(e):     time.sleep(2)     n = 100     print('子程序計算結果為',n)     e.set()   if __name__ == '__main__':     e = Event()     p = Process(target=f1,args=(e,))     p.start()       print('主程序等待....')     e.wait()     print('結果已經寫入檔案,可以拿到值') 4.程序池(重點) Pool 池子 對比:多程序和多程序池的效率對比:程序的建立和銷燬時很消耗的,影響程式碼執行效率 pool.map() 引數資料必須是可迭代的,非同步提交任務,自帶close和join功能 import time from multiprocessing import Process,Pool     # def f1(n): #     time.sleep(1) #     print(n)   #對比多程序和程序池的效率 def f1(n):     for i in range(5):         n = n + i if __name__ == '__main__':     #統計程序池執行100個任務的時間     s_time = time.time()     pool = Pool(4)  #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數     # pool.map(f1,[1,2])  #引數資料必須是可迭代的     pool.map(f1,range(100))  #引數資料必須是可迭代的,非同步提交任務,自帶join功能     e_time = time.time()     dif_time = e_time - s_time       #統計100個程序,來執行100個任務的執行時間     p_s_t = time.time() #多程序起始時間     p_list = []     for i in range(100):         p = Process(target=f1,args=(i,))         p.start()         p_list.append(p)         # p.join()     [pp.join() for pp in p_list]     p_e_t = time.time()     p_dif_t = p_e_t - p_s_t     print('程序池的時間:',dif_time)     print('多程序的執行時間:',p_dif_t)     # 結果:     # 程序池的時間: 0.40102291107177734     # 多程序的執行時間: 9.247529029846191 程序池的同步和非同步的方法: pool.apply()  同步方法,程序池的同步方法,將任務變成序列,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res pool.apply_async()  非同步方法,,可以直接拿到結果物件,從結果物件裡面拿到結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待. pool.close()  鎖住程序池,不在讓其他的持續往程式裡面人新任務,確保沒有新的任務交給程序池裡面的程序 join: 等待著程序池將自己裡面的任務都執行完 程序池的同步方法: import time from multiprocessing import Process,Pool     def f1(n):     time.sleep(1)     # print(n)     return n*n   if __name__ == '__main__':       pool = Pool(4)       for i in range(10):         print('xxxxxx')         res = pool.apply(f1,args=(i,))         print(res) 程序池的非同步方法:   import time from multiprocessing import Process,Pool     def f1(n):     time.sleep(0.5)     # print(n)     return n*n   if __name__ == '__main__':       pool = Pool(4)       res_list = []     for i in range(10):         print('xxxx')         #非同步給程序池提交任務         res = pool.apply_async(f1,args=(i,))         res_list.append(res)       # print('等待所有任務執行完')     # pool.close()  #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了     # pool.join()       #列印結果,如果非同步提交之後的結果物件     for i in res_list:         print(i.get()) 程序池的同步和非同步的綜合: import time from multiprocessing import Process,Pool   def f1(n):     time.sleep(0.5)     # print(n)     return  n*n   if __name__ == '__main__':     pool =Pool(4)     # pool.apply(f1,args=(2,))  #同步方法     res_list = []     for i in range(10):         # print('任務%s' %i)         #程序池的同步方法,將任務變成序列         # res = pool.apply(f1,args=(i,))         # print(res)         #程序池的非同步方法         res =pool.apply_async(f1,args=(i,))         print(res)         # as_result =res.get()  #join的效果         # print(as_result)         res_list.append(res)       pool.close() # 鎖住程序池,不在讓其他的程式網裡面仍新的任務了,確保沒有新的任務交給程序池裡面的程序     pool.join()       for r in res_list:         print(r.get())       time.sleep(2)     #主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務     print('主程序直接結束')     p = Process(target=f1,) 5.程序池的回撥函式   Apply_async(f1,args=(i,),callback=function)   # 將前面 f1 這個任務的返回結果作為引數傳給 callback 指定的那個 function 函式 import os from multiprocessing import Pool,Process   def f1(n):     print('程序池裡面的程序id',os.getpid())     print('>>>>',n)     return n*n   def call_back_func(asdf):     print('>>>>>>>>>>>>>',os.getpid())     print('回撥函式中的結果:',asdf)     # print('回撥函式中的結果:',s.get())   if __name__ == '__main__':     pool = Pool(4)     res = pool.apply_async(f1,args=(5,),callback=call_back_func)     pool.close()     pool.join()     # print(res.get())     print('主程序的程序id',os.getpid())