1. 程式人生 > >python 管道、資料共享、程序池

python 管道、資料共享、程序池

一、管道(Pipe)(瞭解) (詳情參考:https://www.cnblogs.com/clschao/articles/9629392.html)

  程序間通訊(IPC)方式二:管道(不推薦使用,瞭解即可),會導致資料不安全的情況出現,後面我們會說到為什麼會帶來資料 不安全的問題。

 

#建立管道的類:
Pipe([duplex]):在程序之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道
#引數介紹:
dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於傳送。
#主要方法: conn1.recv():接收conn2.send(obj)傳送的物件。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。 conn1.send(obj):通過連線傳送物件。obj是與序列化相容的任意物件 #其他方法: conn1.close():關閉連線。如果conn1被垃圾回收,將自動呼叫此方法 conn1.fileno():返回連線使用的整數檔案描述符 conn1.poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待資料到達。
管道基本介紹

 

  注意:管道之間不允許相同埠之間通訊,只能是一個程序conn1 <--->另一程序的conn2或者一個程序conn2 <--->另一程序的conn1進行通訊。recv未結收到訊息,會阻塞。傳送端管道埠關閉,再發送訊息,報錯EOFError,接收端堵塞,則報錯:OSError。

from multiprocessing import Pipe,Process

def func(conn1,):
    # conn2.close()
    msg = conn1.recv()
    print
('>>>>>',msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # conn1.close() conn2.close() conn2.send('小鬼!') print('主程序結束') #EOFError #傳送端堵塞,無法傳送 #OSError #接收端堵塞,無法接受
send、recv、close

 

關於管道會造成資料不安全問題的官方解釋:
    The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
    由Pipe方法返回的兩個連線物件表示管道的兩端。每個連線物件都有send和recv方法(除其他之外)。注意,如果兩個程序(或執行緒)試圖同時從管道的同一端讀取或寫入資料,那麼管道中的資料可能會損壞。當然,在使用管道的不同端部的過程中不存在損壞風險。
管道官方解釋

 

二、資料共享Manager(瞭解)

  程序之間資料共享的模組之一Manager模組

程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的
雖然程序間資料獨立,但可以通過Manager實現資料共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager簡介

  Manager使用的基本步驟;

    m=Manager()

    dic=m.dict({"name":"sbalex",}

    之後就可以貢各個程序使用,並修改

    (存在資料安全問題,慎用)

from multiprocessing import Process,Manager,Lock

def func1(dic,loc):
    with loc:
        dic["num"]-=1

if __name__ == '__main__':
    m=Manager()
    loc=Lock()
    dic=m.dict({"num":100,})
    p_lst=[]
    for i in range(10):
        p=Process(target=func1,args=(dic,loc))
        p_lst.append(p)
        p.start()

        # p.join()
    [pp.join() for pp in p_lst]
    print(dic)
Manage+Lock使用例項

 

三、程序池Pool(重點)

  

  1、為什麼要有程序池?程序池的概念。

  在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序(空間,變數,檔案資訊等等的內容)也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,維護一個很大的程序列表的同時,排程的時候,還需要進行切換並且記錄每個程序的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。就看我們上面的一些程式碼例子,你會發現有些程式是不是執行的時候比較慢才出結果,就是這個原因,那麼我們要怎麼做呢?

  在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果

  總結:程序池中可以制定建立若干個程序,在程式執行的過程中,即使程序結束了也不會關閉程序,等待下一個任務進入程序池後,拿著空閒的程序繼續幹活.因此處理一大批任務的時候,只需要建立若干個程序,就可以完成所有的任務,節約了建立和銷燬程序的時間,減輕的cpu和硬碟的負擔.

 

  2、multiprocess.Pool 模組

   建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務(高階一些的程序池可以根據你的併發量,搞成動態增加或減少程序池中的程序數量的操作),不會開啟其他程序,提高作業系統效率,減少空間的佔用等。

  建立方法:

Pool([numprocess  [,initializer [, initargs]]]):建立程序池
p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
    
p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成

P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
主要方法介紹

  (1)pool.map(func,可迭代物件) 表示可迭代物件分別把所有的值依次傳遞給func執行,並建立程序,內部有join機制

from multiprocessing import Process,Pool
import  time
def func1(i):
    num=i
    for j in range(5):
        num+=j


if __name__ == '__main__':
    #不使用程序池
    start_time=time.time()
    p_lst=[]
    for i in range(1000):
        p=Process(target=func1,args=(i,))
        p_lst.append(p)
        p.start()

    [pp.join() for pp in p_lst]
    end_time=time.time()
    print("不適用程序池",end_time-start_time)

    #引入程序池
    s_time=time.time()
    pool=Pool(8)
    pool.map(func1,range(1000)) #自動join,pool.map(函式,可迭代物件)
    e_time=time.time()
    print("使用程序池>>>",e_time-s_time)
不使用程序池和使用程序池效率對比
執行結果:
不適用程序池 36.224305391311646
使用程序池>>> 0.30788564682006836
結果顯示

  

  注意:有一點,map是非同步執行的,並且自帶close和join

    一般約定俗成的是程序池中的程序數量為CPU的數量,工作中要看具體情況來考量。

  (2)pool.apply(func,不可迭代物件) 表示給func傳參並建立程序。

      pool.apply——async(func,不可迭代物件) 表示給func傳參並建立程序。

        pool.get()  程序池執行任務返回的是物件,需要用get方法獲取返回值的內容。

        pool.close() 不是關閉程序池,而是停止程序池接受新任務,這樣才能感知程序池是否把所有任                                                   務執行完畢

        pool.join() 主程式等待程序池執行完所有任務再結束,必須在pool.close之後。

 

from multiprocessing import Process,Pool
import time
def func(i):
    num=0
    time.sleep(1)
    for j in range(5):
        num+=i
    return num
if __name__ == '__main__':
    pool=Pool(4) #程序池中建立4個程序,以後一直都是這四個程序在執行任務
    for i in range(10):
        #apply 序列執行
        #同步呼叫,直到本次任務執行完畢拿到ret,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
        ret=pool.apply(func,args=(i,))#不能傳入可迭代物件
        print(ret)
apply 同步(串列埠)執行任務
import time
from multiprocessing import Process,Pool

def func(i):
    num=0
    time.sleep(1)
    for j in range(5):
        num+=i
    # print(num)
    return num
if __name__ == '__main__':
    pool=Pool(4)
    lst=[]
    for  i in range(10):
        ret=pool.apply_async(func,args=(i,))
        # print(ret.get()) #放在for 迴圈中,建立一個程序取一個值
        lst.append(ret)
    
    pool.close() #不是關閉程序池,而是結束程序池接收任務,確保沒有新任務再提交過來。
    # 感知程序池中的任務已經執行結束,只有當沒有新的任務新增進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法
    pool.join()
    # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
    [print(el.get()) for el in lst]
    
apply_async 非同步執行程序

  注意:非同步apply_async用法:如果使用非同步提交的任務,主程式需要用join,等待程序池中的任務處理完畢後才可以用get獲取結果。使用join之前,必須使用pool.close,結束程序接受任務。否則無法感知是否會有程序進入程序池,就不會知道何時程序池沒有任務。

#一:使用程序池(非同步呼叫,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        res_l.append(res)
        # s = res.get() #如果直接用res這個結果物件呼叫get方法獲取結果的話,這個程式就變成了同步,因為get方法直接就在這裡等著你建立的程序的結果,第一個程序建立了,並且去執行了,那麼get就會等著第一個程序的結果,沒有結果就一直等著,那麼主程序的for迴圈是無法繼續的,所以你會發現變成了同步的效果
    print("==============================>") #沒有後面的join,或get,則程式整體結束,程序池中的任務還沒來得及全部執行完也都跟著主程序一起結束了

    pool.close() #關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
    pool.join()   #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>物件組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是呼叫每個物件下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

#二:使用程序池(同步呼叫,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
    print("==============================>")
    pool.close()
    pool.join()   #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法
        print(i)
詳解apply和apply_async

 

  #程序池版socket聊天程式碼:

#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個程序內檢視pid,會發現pid使用為4個,即多個客戶端公用4個程序
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

server端:tcp_server.py
伺服器
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客戶端

  

  (3)回撥函式

需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式,這是程序池特有的,普通程序沒有這個機制,但是我們也可以通過程序通訊來拿到返回值,程序池的這個回撥也是程序通訊的機制完成的。

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果
回撥函式簡介

  注意:回撥函式是在主程序執行的,但是如果主程序未接受到pool.apply_async函式內部的的回撥資訊,則可能會提前關閉子程序.

import time
from multiprocessing import Pool,Process

import os

def func(n):
    # print('xxxxxxxxxx')
    print('子程序的pid :',os.getpid())
    return n*n,'約嗎'

def call_back_func(x):
    # print(x)  #(9, '約嗎')
    print('call_back pid ::',os.getpid())
    print(x[0])

if __name__ == '__main__':
    pool = Pool(4)
    pool.apply_async(func,args=(3,),callback=call_back_func)

    print('主程序的pid:',os.getpid())
    pool.close()
    pool.join()
驗證回撥函式在主程序執行

  程式碼顯示:回撥函式與主程序函式的pid號一致,說明了回撥函式在主程序中執行.