1. 程式人生 > >python之路-day32-管道、資料共享、程序池

python之路-day32-管道、資料共享、程序池

 

 

一、管道(不推薦使用,瞭解即可)

  程序間通訊(IPC)方式二:管道(不推薦使用,瞭解即可),會導致資料不安全的情況出現,後面還會提到為什麼

會帶來資料不安全的問題。

 

 1 #建立管道的類:
 2 Pipe([duplex]):在程序之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道
 3 #引數介紹:
 4 dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於傳送。
5 #主要方法: 6 conn1.recv():接收conn2.send(obj)傳送的物件。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。 7 conn1.send(obj):通過連線傳送物件。obj是與序列化相容的任意物件 8 #其他方法: 9 conn1.close():關閉連線。如果conn1被垃圾回收,將自動呼叫此方法 10 conn1.fileno():返回連線使用的整數檔案描述符 11 conn1.poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待資料到達。
12 13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法傳送的一條完整的位元組訊息。maxlength指定要接收的最大位元組數。如果進入的訊息,超過了這個最大值,將引發IOError異常,並且在連線上無法進行進一步讀取。如果連線的另外一端已經關閉,再也不存在任何資料,將引發EOFError異常。 14 conn.send_bytes(buffer [, offset [, size]]):通過連線傳送位元組資料緩衝區,buffer是支援緩衝區介面的任意物件,offset是緩衝區中的位元組偏移量,而size是要傳送位元組數。結果資料以單條訊息的形式發出,然後呼叫c.recv_bytes()函式進行接收
15 16 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,並把它儲存在buffer物件中,該物件支援可寫入的緩衝區介面(即bytearray物件或類似的物件)。offset指定緩衝區中放置訊息處的位元組位移。返回值是收到的位元組數。如果訊息長度大於可用的緩衝區空間,將引發BufferTooShort異常。
管道介紹
 1 from multiprocessing import Process, Pipe
 2 
 3 def f(conn):
 4     conn.send("Hello 妹妹") #子程序傳送了訊息
 5     conn.close()
 6 
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe() #建立管道,拿到管道的兩端,雙工通訊方式,兩端都可以收發訊息
 9     p = Process(target=f, args=(child_conn,)) #將管道的一段給子程序
10     p.start() #開啟子程序
11     print(parent_conn.recv()) #主程序接受了訊息
12     p.join()
管道初使用
1 應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程式可能在消費者中的recv()操作上掛起(就是阻塞)。管道是由作業系統進行引用計數的,必須在所有程序中關閉管道的相同一端就會能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點
管道使用注意須知
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()            
引發EOFError

主程序將管道的兩端都傳送給子程序,子程序和主程序共用管道的兩種報錯情況,都是在recv接收的時候報錯的:

    1.主程序和子程序中的管道的相同一端都關閉了,出現EOFError;

    2.如果你管道的一端在主程序和子程序中都關閉了,但是你還用這個關閉的一端去接收訊息,那麼就會出現OSError;

 

二、資料共享(瞭解即可)

  基於訊息傳遞的併發程式設計是大勢所趨

  即便是使用執行緒,推薦做法也是將程式設計為大量獨立的執行緒集合

  通過訊息佇列交換資料,這樣極大的減少了對使用鎖定和其他同步手段的需求,還可以擴充套件到分散式系統中

注意:程序之間應當是儘量避免通訊,即使需要通訊,也應該選擇程序安全的工具來避免加鎖帶來的問題。應該儘量避免使用

共享資料的方法,以後會嘗試使用資料庫來解決程序之間的資料共享問題

  程序之間資料共享模組之一Manager模組:

1 程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的
2 雖然程序間資料獨立,但可以通過Manager實現資料共享,事實上Manager的功能遠不止於此
3 
4 A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
5 
6 A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager模組介紹

  多程序共同去處理共享資料的時候,就和我們多程序同時去操作一個檔案中的資料一樣,不加鎖就會出現錯誤的結果,程序

不安全的,所以也需要加鎖

 1 from multiprocessing import Manager,Process,Lock
 2 def work(d,lock):
 3     with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂
 4         d['count']-=1
 5 
 6 if __name__ == '__main__':
 7     lock=Lock()
 8     with Manager() as m:
 9         dic=m.dict({'count':100})
10         p_l=[]
11         for i in range(100):
12             p=Process(target=work,args=(dic,lock))
13             p_l.append(p)
14             p.start()
15         for p in p_l:
16             p.join()
17         print(dic)
Manager模組使用

  總結下,程序之間的通訊:佇列、管道、資料共享

 

三、程序池

  multiprocess.pool 模組

  建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個

程序去執行使用所有的任務(高階一些的程序池可以根據你的併發量,搞成動態增加或減少程序池中程序數量的操作),

不會開啟其他程序,提高作業系統效率,減少空間的佔用等

 1 numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
 2 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
 3 initargs:是要傳給initializer的引數組
 4 
 5 p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
 6 '''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''
 7 
 8 p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
 9 '''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
10     
11 p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
12 
13 P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
14 
15 方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
16 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
17 obj.ready():如果呼叫完成,返回True
18 obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
19 obj.wait([timeout]):等待結果變為可用。
20 obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式
Manager的方法
 1 import time
 2 from multiprocessing import Pool,Process
 3 
 4 #針對range(100)這種引數的
 5 # def func(n):
 6 #     for i in range(3):
 7 #         print(n + 1)
 8 
 9 def func(n):
10     print(n)
11     # 結果:
12     #     (1, 2)
13     #     alex
14 def func2(n):
15     for i in range(3):
16         print(n - 1)
17 if __name__ == '__main__':
18     #1.程序池的模式
19     s1 = time.time()  #我們計算一下開多程序和程序池的執行效率
20     poll = Pool(5) #建立含有5個程序的程序池
21     # poll.map(func,range(100)) #非同步呼叫程序,開啟100個任務,map自帶join的功能
22     poll.map(func,[(1,2),'alex']) #非同步呼叫程序,開啟100個任務,map自帶join的功能
23     # poll.map(func2,range(100))  #如果想讓程序池完成不同的任務,可以直接這樣搞
24     #map只限於接收一個可迭代的資料型別引數,列表啊,元祖啊等等,如果想做其他的引數之類的操作,需要用後面我們要學的方法。
25     # t1 = time.time() - s1
26     #
27     # #2.多程序的模式
28     # s2 = time.time()
29     # p_list = []
30     # for i in range(100):
31     #     p = Process(target=func,args=(i,))
32     #     p_list.append(p)
33     #     p.start()
34     # [pp.join() for pp in p_list]
35     # t2 = time.time() - s2
36     #
37     # print('t1>>',t1) #結果:0.5146853923797607s 程序池的效率高
38     # print('t2>>',t2) #結果:12.092015027999878s
程序池的簡單應用及與程序池的效率對比