1. 程式人生 > >python網路程式設計--管道,訊號量,Event,程序池,回撥函式

python網路程式設計--管道,訊號量,Event,程序池,回撥函式

1.管道
 

 加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行任務修改,即序列修改,速度慢了,但犧牲了速度卻保證了資料安全.
 檔案共享資料實現程序間的通訊,但問題是:
    1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
    2.需要自己加鎖處理
 而使用multiprocess模組為我們提供的基於訊息IPC通訊機制:通訊和管道 可以幫我們解決這兩個問題.
 佇列和管道都是將資料存放於記憶體內,而佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來來,因而佇列才是程序間通訊的最佳選擇
 我們應該儘量避免使用共享資料,儘可能的使用訊息傳遞和佇列,避免初拉力複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可擴充套件性.
 格式:
    conn1,conn2 = Pipe()
    conn1.recv()
    conn1.send()
    資料接收一次就沒有了

示例程式碼:
        from multiprocessing import Process

        def f1(conn):
            from_zhujincheng  = conn.recv()
            print('我是子程序')
            print('我是來自主程序的訊息>>>',from_zhujincheng)

        if __name__ == '__main__':

            conn1,conn2 = Pipe()
            p1 = Process(target=f1, args=(conn2,))
            p1.start()
            conn1.send('你好')
            print('我是主程序')
    結果:
        我是主程序
        我是子程序
        我是來自主程序的訊息>>> 你好

    案例:

from multiprocessing import Process,Pipe


def f1(conn):

    from_zhujincheng = conn.recv()

    print('我是子程序')
    print('來自主程序的訊息:',from_zhujincheng)


if __name__ == '__main__':
    conn1,conn2 = Pipe()  #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的

    #可以將一端或者兩端傳送給其他的程序,那麼多個程序之間就可以通過這一個管道進行通訊了
    p1 = Process(target=f1,args=(conn2,))
    p1.start()

    conn1.send('你在哪')

    print('我是主程序')

  

2.事件Event

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測.如果程式中的其他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這是執行緒同步問題就會變得非常棘手.為了解決這些問題,我們需要使用Event物件.物件包含一個可由執行緒設定的訊號標誌,它允許執行緒或程序等待某些事件的發生.在初始情況下,Event物件中的訊號標誌被設定為假(False).
    e = Event()  #初識狀態是false
    e.wait()  當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行
    e.set()  將事件物件的狀態改為true,
    e.is_set() 檢視狀態
    e.clear()  將事件物件的狀態改為false

   基於事件的程序間通訊:

import time
from multiprocessing import Process,Event


def f1(e):
    time.sleep(2)
    n = 100
    print('子程序計算結果為',n)
    e.set()


if __name__ == '__main__':
    e = Event()

    p = Process(target=f1,args=(e,))
    p.start()

    print('主程序等待...')
    e.wait()
    print('結果已經寫入檔案了,可以拿到這值')

  

   示例:

from multiprocessing import Process,Event

e = Event()  #建立事件物件,這個物件的初識狀態為False
print('e的狀態是:',e.is_set())

print('程序執行到這裡了')
e.set()  #將e的狀態改為True
print('e的狀態是:',e.is_set())

e.clear()  #將e的狀態改為False

e.wait()  #e這個事件物件如果值為False,就在我加wait的地方等待

print('程序過了wait')

3.訊號量(Semaphore)

訊號量也是一把鎖,可以指定訊號量為5, 對比互斥鎖同一時間只能有一個任務搶到鎖去執行,訊號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那麼訊號量就相當於一群路人去爭搶公共廁所,公共廁所有多個坑位,這意味著同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是訊號量的大小.
Semaphore管理一個內建的計數器
每當呼叫acquire時內建計數器-1
呼叫release()時內建計數器+1
計數器不能小於0,當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()


格式:
        s = Semphore(4) # 內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待

        s.acquire()
        需要鎖住的程式碼
        s.release()
    
   示例:
import time
import random
from multiprocessing import Process,Semaphore

def f1(i,s):
    s.acquire()

    print('%s男嘉賓到了'%i)
    time.sleep(random.randint(1,3))
    s.release()

if __name__ == '__main__':
    s = Semaphore(4)  #計數器4,acquire一次減一,為0 ,其他人等待,release加1,
    for i in range(10):
        p = Process(target=f1,args=(i,s))

        p.start()

  

4.程序池

程序的建立和銷燬是很有消耗的,影響程式碼執行效率

程序池:
方法:
    map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶close和join功能
    res = Apply(f1,args=(i,))  #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res
    res_obj = Apply_async(f1,args=(i,))  #非同步提交任務,可以直接拿到結果物件,從結果物件裡面拿結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待
    close : 鎖住程序池,防止有其他的新的任務在提交給程序池
    Join : 等待著程序池將自己裡面的任務都執行完  

   map方法使用:

def f1(n):
    for i in range(5):
        n = n + i
if __name__ == '__main__':
    #統計程序池執行100個任務的時間
    s_time = time.time()
    pool = Pool(4)  #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數
    # pool.map(f1,[1,2])  #引數資料必須是可迭代的
    pool.map(f1,range(100))  #引數資料必須是可迭代的,非同步提交任務,自帶join功能

  程序池的非同步方法和同步方法時間比較(非同步效率更高)

  程序池同步方法:

import time
from multiprocessing import Process,Pool

def f1(n):
    time.sleep(1)
    # print(n)
    return n*n

if __name__ == '__main__':

    pool = Pool(4)

    for i in range(10):
        print('xxxxxx')
        res = pool.apply(f1,args=(i,))
        print(res)

  程序池非同步方法:

import time
from multiprocessing import Process,Pool


def f1(n):
    time.sleep(0.5)
    # print(n)
    return n*n

if __name__ == '__main__':

    pool = Pool(4)

    res_list = []
    for i in range(10):
        print('xxxx')
        #非同步給程序池提交任務
        res = pool.apply_async(f1,args=(i,))
        res_list.append(res)

    # print('等待所有任務執行完')
    # pool.close()  #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了
    # pool.join()

    #列印結果,如果非同步提交之後的結果物件
    for i in res_list:
        print(i.get())

    # time.sleep(10)

  程序池同步方法與非同步方法的時間比較

import time
from multiprocessing import Process,Pool


def f1(n):
    time.sleep(0.5)
    # print(n)
    return n*n

if __name__ == '__main__':

    pool = Pool(4)

    res_list = []
    for i in range(10):
        print('xxxx')
        #非同步給程序池提交任務
        res = pool.apply_async(f1,args=(i,))
        res_list.append(res)

    # print('等待所有任務執行完')
    # pool.close()  #鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了
    # pool.join()

    #列印結果,如果非同步提交之後的結果物件
    for i in res_list:
        print(i.get())

    # time.sleep(10)

結果
    程序池用的時間 0.5779643058776855
    主程序
    >>>多程序的執行時間 1547113644.185883

 

5.回撥函式

  可以為程序池內的每個程序繫結一個函式,該函式在程序或執行緒的任務執行完畢後自動觸發,並接收任務的返回值當作引數,該函式稱為回撥函式
apply_async(f1,args=(i,),callback=function)  #將前面f1這個任務的返回結果作為引數傳給callback指定的那個function函式

  示例:

import os
from multiprocessing import Pool,Process

def f1(n):
    print('程序池裡面的程序id',os.getpid())
    print('>>>>',n)
    return n*n

def call_back_func(asdf):
    print('>>>>>>>>>>>>>',os.getpid())
    print('回撥函式中的結果:',asdf)
    # print('回撥函式中的結果:',s.get())

if __name__ == '__main__':
    pool = Pool(4)
    res = pool.apply_async(f1,args=(5,),callback=call_back_func)
    pool.close()
    pool.join()
    # print(res.get())
    print('主程序的程序id',os.getpid())