1. 程式人生 > >Python併發程式設計(管道,資料共享,訊號量,程序池)

Python併發程式設計(管道,資料共享,訊號量,程序池)

管道(瞭解)

    程序間通訊(IPC)方式二:管道(不推薦使用,瞭解即可),會導致資料不安全的情況出現,後面我們會說到為什麼會帶來資料 不安全的問題。

#建立管道的類:
Pipe([duplex]):在程序之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道
#引數介紹:
dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於傳送。
#主要方法:
    conn1.recv():接收conn2.send(obj)傳送的物件。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。
    conn1.send(obj):通過連線傳送物件。obj是與序列化相容的任意物件
 
#其他方法: conn1.close():關閉連線。如果conn1被垃圾回收,將自動呼叫此方法 conn1.fileno():返回連線使用的整數檔案描述符 conn1.poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待資料到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法傳送的一條完整的位元組訊息。maxlength指定要接收的最大位元組數。如果進入的訊息,超過了這個最大值,將引發IOError異常,並且在連線上無法進行進一步讀取。如果連線的另外一端已經關閉,再也不存在任何資料,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連線傳送位元組資料緩衝區,buffer是支援緩衝區介面的任意物件,offset是緩衝區中的位元組偏移量,而size是要傳送位元組數。結果資料以單條訊息的形式發出,然後呼叫c.recv_bytes()函式進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,並把它儲存在buffer物件中,該物件支援可寫入的緩衝區介面(即bytearray物件或類似的物件)。offset指定緩衝區中放置訊息處的位元組位移。返回值是收到的位元組數。如果訊息長度大於可用的緩衝區空間,將引發BufferTooShort異常。 管道介紹
from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello 妹妹") #子程序傳送了訊息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #建立管道,拿到管道的兩端,雙工通訊方式,兩端都可以收發訊息
    p = Process(target=f, args=(child_conn,)) #將管道的一段給子程序
    p.start() #開啟子程序
    print
(parent_conn.recv()) #主程序接受了訊息 p.join() 管道初使用

 

  應該特別注意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程式可能在消費者中的recv()操作上掛起(就是阻塞)。管道是由作業系統進行引用計數的,必須在所有程序中關閉管道的相同一端就會能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 

from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.close()
    p.join()            

引發EOFError

    主程序將管道的兩端都傳送給子程序,子程序和主程序共用管道的兩種報錯情況,都是在recv接收的時候報錯的:

    1.主程序和子程序中的管道的相同一端都關閉了,出現EOFError;

    2.如果你管道的一端在主程序和子程序中都關閉了,但是你還用這個關閉的一端去接收訊息,那麼就會出現OSError;

 

    所以你關閉管道的時候,就容易出現問題,需要將所有隻用這個管道的程序中的兩端全部關閉才行。當然也可以通過異常捕獲(try:except EOFerror)來處理。

    雖然我們在主程序和子程序中都列印了一下conn1一端的物件,發現兩個不再同一個地址,但是子程序中的管道和主程序中的管道還是可以通訊的,因為管道是同一套,系統能夠記錄。    

 

    我們的目的就是關閉所有的管道,那麼主程序和子程序進行通訊的時候,可以給子程序傳管道的一端就夠了,並且用我們之前學到的,資訊傳送完之後,再發送一個結束訊號None,那麼你收到的訊息為None的時候直接結束接收或者說結束迴圈,就不用每次都關閉各個程序中的管道了。

from multiprocessing import Pipe,Process

def func(conn):
    while True:
        msg = conn.recv()
        if msg is None:break
        print(msg)

if __name__ == '__main__':
    conn1,conn2 = Pipe()
    p = Process(target=func,args=(conn1,))
    p.start()
    for i in range(10):
        conn2.send('約吧')
    conn2.send(None)

通過結束訊號None來結束程式
from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == '__main__':
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print('主程序')

通過管道來實現生產者消費者模型
關於管道會造成資料不安全問題的官方解釋:
    The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
    由Pipe方法返回的兩個連線物件表示管道的兩端。每個連線物件都有send和recv方法(除其他之外)。注意,如果兩個程序(或執行緒)試圖同時從管道的同一端讀取或寫入資料,那麼管道中的資料可能會損壞。當然,在使用管道的不同端部的過程中不存在損壞風險。
from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == '__main__':
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),'c1',lock))
    c2=Process(target=consumer,args=((produce,consume),'c2',lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主程序')

多個消費者競爭會出現資料不安全的問題的解決方案:加鎖

     管道可以用於雙工通訊,通常利用在客戶端/服務端中使用的請求/響應模型,或者遠端過程呼叫,就可以使用管道編寫與程序互動的程式,像前面將網路通訊的時候,我們使用了一個叫subprocess的模組,裡面有個引數是pipe管道,執行系統指令,並通過管道獲取結果。

 

資料共享(瞭解)

    展望未來,基於訊息傳遞的併發程式設計是大勢所趨

    即便是使用執行緒,推薦做法也是將程式設計為大量獨立的執行緒集合

    通過訊息佇列交換資料。這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴充套件到分散式系統中

    程序間應該儘量避免通訊,即便需要通訊,也應該選擇程序安全的工具來避免加鎖帶來的問題,應該儘量避免使用本節所講的共享資料的方式,以後我們會嘗試使用資料庫來解決程序之間的資料共享問題。

    程序之間資料共享的模組之一Manager模組:

程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的
雖然程序間資料獨立,但可以通過Manager實現資料共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

Manager模組介紹

   多程序共同去處理共享資料的時候,就和我們多程序同時去操作一個檔案中的資料是一樣的,不加鎖就會出現錯誤的結果,程序不安全的,所以也需要加鎖

from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

Manager模組使用

 

總結一下,程序之間的通訊:佇列、管道、資料共享也算

下面要講的訊號量和事件也相當於鎖,也是全域性的,所有程序都能拿到這些鎖的狀態,程序之間這些鎖啊訊號量啊事件啊等等的通訊,其實底層還是socekt,只不過是基於檔案的socket通訊,而不是跟上面的資料共享啊空間共享啊之類的機制,我們之前學的是基於網路的socket通訊,還記得socket的兩個家族嗎,一個檔案的一個網路的,所以將來如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤,簡單知道一下就可以啦~~~

工作中常用的是鎖,訊號量和事件不常用,但是訊號量和事件面試的時候會問到,你能知道就行啦~~~

 

訊號量(瞭解)

互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。
假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。
訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念

   比如大保健:提前設定好,一個房間只有4個床(計數器現在為4),那麼同時只能四個人進來,誰先來的誰先佔一個床(acquire,計數器減1),4個床滿了之後(計數器為0了),第五個人就要等著,等其中一個人出來(release,計數器加1),他就去佔用那個床了。

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print('%s 佔到一間ktv小屋' %user)
    time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

訊號量使用

事件(瞭解)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

clear:將“Flag”設定為False
set:將“Flag”設定為True
from multiprocessing import Process,Semaphore,Event
import time,random

e = Event() #建立一個事件物件
print(e.is_set())  #is_set()檢視一個事件的狀態,預設為False,可通過set方法改為True
print('look here!')
# e.set()          #將is_set()的狀態改為True。
# print(e.is_set())#is_set()檢視一個事件的狀態,預設為False,可通過set方法改為Tr
# e.clear()        #將is_set()的狀態改為False
# print(e.is_set())#is_set()檢視一個事件的狀態,預設為False,可通過set方法改為Tr
e.wait()           #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞
print('give me!!')

#set和clear  修改事件的狀態 set-->True   clear-->False
#is_set     用來檢視一個事件的狀態
#wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞

事件方法的使用
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
            print('\033[31m紅燈亮\033[0m,car%s等著' % n)
            e.wait()    # 阻塞,等待is_set()的值變成True,模擬訊號燈為綠色
            print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
            time.sleep(random.randint(2,4))
            if not e.is_set():   #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
                continue
            print('車開遠了,car', n)
            break

# def police_car(e, n):
#     while True:
#         if not e.is_set():# 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
#             print('\033[31m紅燈亮\033[0m,car%s等著' % n)
#             e.wait(0.1) # 阻塞,等待設定等待時間,等待0.1s之後沒有等到綠燈就闖紅燈走了
#             if not e.is_set():
#                 print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
#             else:
#                 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
#         break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print('######', e.is_set())
            e.clear()  # ---->將is_set()的值設定為False
        else:
            e.set()    # ---->將is_set()的值設定為True
            print('***********',e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 建立10個程序控制10輛車
        time.sleep(random.random(1, 3))    #車不是一下子全過來
        p.start()

    # for i in range(5):
    #     p = Process(target=police_car, args=(e, i,))  # 建立5個程序控制5輛警車
    #     p.start()

    #訊號燈必須是單獨的程序,因為它不管你車開到哪了,我就按照我紅綠燈的規律來閃爍變換,對吧
    t = Process(target=traffic_lights, args=(e, 5))  # 建立一個程序控制紅綠燈
    t.start()

    print('預備~~~~開始!!!')

通過事件來模擬紅綠燈示例

程序池和mutiprocess.Poll

為什麼要有程序池?程序池的概念。

  在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序(空間,變數,檔案資訊等等的內容)也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,維護一個很大的程序列表的同時,排程的時候,還需要進行切換並且記錄每個程序的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。就看我們上面的一些程式碼例子,你會發現有些程式是不是執行的時候比較慢才出結果,就是這個原因,那麼我們要怎麼做呢?

  在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果

multiprocess.Poll模組

   建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務(高階一些的程序池可以根據你的併發量,搞成動態增加或減少程序池中的程序數量的操作),不會開啟其他程序,提高作業系統效率,減少空間的佔用等。

   概念介紹:

Pool([numprocess  [,initializer [, initargs]]]):建立程序池
numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值
initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
initargs:是要傳給initializer的引數組
p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
    
p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成

P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

主要方法介紹
方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
obj.ready():如果呼叫完成,返回True
obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式
import time
from multiprocessing import Pool,Process

#針對range(100)這種引數的
# def func(n):
#     for i in range(3):
#         print(n + 1)

def func(n):
    print(n)
    # 結果:
    #     (1, 2)
    #     alex
def func2(n):
    for i in range(3):
        print(n - 1)
if __name__ == '__main__':
    #1.程序池的模式
    s1 = time.time()  #我們計算一下開多程序和程序池的執行效率
    poll = Pool(5) #建立含有5個程序的程序池
    # poll.map(func,range(100)) #非同步呼叫程序,開啟100個任務,map自帶join的功能
    poll.map(func,[(1,2),'alex']) #非同步呼叫程序,開啟100個任務,map自帶join的功能
    # poll.map(func2,range(100))  #如果想讓程序池完成不同的任務,可以直接這樣搞
    #map只限於接收一個可迭代的資料型別引數,列表啊,元祖啊等等,如果想做其他的引數之類的操作,需要用後面我們要學的方法。
    # t1 = time.time() - s1
    #
    # #2.多程序的模式
    # s2 = time.time()
    # p_list = []
    # for i in range(100):
    #     p = Process(target=func,args=(i,))
    #     p_list.append(p)
    #     p.start()
    # [pp.join() for pp in p_list]
    # t2 = time.time() - s2
    #
    # print('t1>>',t1) #結果:0.5146853923797607s 程序池的效率高
    # print('t2>>',t2) #結果:12.092015027999878s

程序池的簡單應用及與程序池的效率對比

  有一點,map是非同步執行的,並且自帶close和join

  一般約定俗成的是程序池中的程序數量為CPU的數量,工作中要看具體情況來考量。

 

  實際應用程式碼示例:

  同步與非同步兩種執行方式:

import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(1)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步呼叫都會在原地等著
        res_l.append(res)
    print(res_l)

程序池的同步呼叫
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行,並且可以執行不同的任務,傳送任意的引數了。
                                          # 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
                                          # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束
                                          # 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。  
        res_l.append(res)

    # 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用join,等待程序池內任務都處理完,然後可以用get收集結果
    # 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了
    p.close() #不是關閉程序池,而是結束程序池接收任務,確保沒有新任務再提交過來。
    p.join()   #感知程序池中的任務已經執行結束,只有當沒有新的任務新增進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

程序池的非同步呼叫
#一:使用程序池(非同步呼叫,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        res_l.append(res)
        # s = res.get() #如果直接用res這個結果物件呼叫get方法獲取結果的話,這個程式就變成了同步,因為get方法直接就在這裡等著你建立的程序的結果,第一個程序建立了,並且去執行了,那麼get就會等著第一個程序的結果,沒有結果就一直等著,那麼主程序的for迴圈是無法繼續的,所以你會發現變成了同步的效果
    print("==============================>") #沒有後面的join,或get,則程式整體結束,程序池中的任務還沒來得及全部執行完也都跟著主程序一起結束了

    pool.close() #關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
    pool.join()   #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>物件組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是呼叫每個物件下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

#二:使用程序池(同步呼叫,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
    print("==============================>")
    pool.close()
    pool.join()   #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法
        print(i)

詳解:apply_async和apply

    程序池版的socket併發聊天程式碼示例:

#Pool內的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個程序內檢視pid,會發現pid使用為4個,即多個客戶端公用4個程序
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn):
    print('程序pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

server端:tcp_server.py
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))

client端:tcp_client.py

   發現:併發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.

   同時最多和4個人進行聊天,因為程序池中只有4個程序可供呼叫,那有同學會問,我們這麼多人想同時聊天怎麼辦,又不讓用多程序,程序池也不能開太多的程序,那咋整啊,後面我們會學到多執行緒,到時候大家就知道了,現在你們先這樣記住就好啦

 

   然後我們再提一個回撥函式

需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式,這是程序池特有的,普通程序沒有這個機制,但是我們也可以通過程序通訊來拿到返回值,程序池的這個回撥也是程序通訊的機制完成的。

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果
import os
from multiprocessing import Pool

def func1(n):
    print('func1>>',os.getpid())
    print('func1')
    return n*n

def func2(nn):
    print('func2>>',os.getpid())
    print('func2')
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == '__main__':
    print('主程序:',os.getpid())
    p = Pool(5)
    #args裡面的10給了func1,func1的返回值作為回撥函式的引數給了callback對應的函式,不能直接給回撥函式直接傳引數,他只能是你任務函式func1的函式的返回值
    # for i in range(10,20): #如果是多個程序來執行任務,那麼當所有子程序將結果給了回撥函式之後,回撥函式又是在主程序上執行的,那麼就會出現列印結果是同步的效果。我們上面func2裡面登出的時間模組開啟看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#結果
# 主程序: 11852  #發現回撥函式是在主程序中完成的,其實如果是在子程序中完成的,那我們直接將程式碼寫在子程序的任務函式func1裡面就行了,對不對,這也是為什麼稱為回撥函式的原因。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100

回撥函式的簡單使用

  回撥函式在寫的時候注意一點,回撥函式的形參執行有一個,如果你的執行函式有多個返回值,那麼也可以被回撥函式的這一個形參接收,接收的是一個元祖,包含著你執行函式的所有返回值。

  

  使用程序池來搞爬蟲的時候,最耗時間的是請求地址的網路請求延遲,那麼如果我們在將處理資料的操作加到每個子程序中,那麼所有在程序池後面排隊的程序就需要等更長的時間才能獲取程序池裡面的執行程序來執行自己,所以一般我們就將請求作成一個執行函式,通過程序池去非同步執行,剩下的資料處理的內容放到另外一個程序或者主程序中去執行,將網路延遲的時間也利用起來,效率更高。

  requests這個模組的get方法請求頁面,就和我們在瀏覽器上輸入一個網址然後回車去請求別人的網站的效果是一樣的。安裝requests模組的指令:在cmd視窗執行pip install requests。

import requests
response = requests.get('http://www.baidu.com')
print(response)
print(response.status_code) #200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤
print(response.content.decode('utf-8'))
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回撥函式處理了

'''
列印結果:
<程序3388> get https://www.baidu.com
<程序3389> get https://www.python.org
<程序3390> get https://www.openstack.org
<程序3388> get https://help.github.com/
<程序3387> parse https://www.baidu.com
<程序3389> get http://www.sina.com.cn/
<程序3387> parse https://www.python.org
<程序3387> parse https://help.github.com/
<程序3387> parse http://www.sina.com.cn/
<程序3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''

使用多程序請求多個url來減少網路等待浪費的時間
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))

爬蟲示例

 

  如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待程序池中所有程序執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有結果
    print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理

無需回撥函式的示例

 程序池是多個需要被執行的任務在程序池外面排隊等待獲取程序物件去執行自己,而訊號量是一堆程序等待著去執行一段邏輯程式碼。

  訊號量不能控制建立多少個程序,但是可以控制同時多少個程序能夠執行,但是程序池能控制你可以建立多少個程序。

  舉例:就像那些開大車拉煤的,訊號量是什麼呢,就好比我只有五個車道,你每次只能過5輛車,但是不影響你建立100輛車,但是程序池相當於什麼呢?相當於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。

  其他語言裡面有更高階的程序池,在設定的時候,可以將程序池中的程序動態的創建出來,當需求增大的時候,就會自動在程序池中新增程序,需求小的時候,自動減少程序,並且可以設定程序數量的上線,最多為多,python裡面沒有。

程序池的其他實現方式:https://docs.python.org/dev/library/concurrent.futures.html