1. 程式人生 > >python D32 管道、線程池

python D32 管道、線程池

127.0.0.1 opened 便是 aid cal star 數據安全 第一個 多任務

管道:

Pipe

Conn1,conn2 = Pipe()

數據共享:

M = Manager()

Dic = m.dict({‘name’:sbalex})

數據安全的問題

加鎖

進程池 *****

Map:異步提交任務,參數是可叠代對象,自帶close + join

Apply :同步提交任務,直接可以收到返回值

Apply_async() 異步提交任務:res.get() 阻塞效果

Close join

回調函數:callback=

一、管道

 進程間通信(IPC)方式二:管道(不推薦使用,了解即可),會導致數據不安全的情況出現,後面我們會說到為什麽會帶來數據 不安全的問題。

技術分享圖片 技術分享圖片
#創建管道的類:
Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麽recv方法會拋出EOFError。
    conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
 #其他方法:
conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回連接使用的整數文件描述符
conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
技術分享圖片

技術分享圖片 技術分享圖片
from multiprocessing import Process, Pipe

def f(conn):
    conn.send("Hello 妹妹") #子進程發送了消息
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe() #建立管道,拿到管道的兩端,雙工通信方式,兩端都可以收發消息
    p = Process(target=f, args=(child_conn,)) #將管道的一段給子進程
    p.start() #開啟子進程
    print(parent_conn.recv()) #主進程接受了消息
    p.join()
技術分享圖片

    技術分享圖片

    應該特別註意管道端點的正確管理問題。如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起(就是阻塞)。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道的相同一端就會能生成EOFError異常。因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。   

技術分享圖片 技術分享圖片
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    #parent_conn.close() #不寫close將不會引發EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send(‘hello‘)
    parent_conn.close()
    p.join()            
技術分享圖片

    主進程將管道的兩端都傳送給子進程,子進程和主進程共用管道的兩種報錯情況,都是在recv接收的時候報錯的:

    1.主進程和子進程中的管道的相同一端都關閉了,出現EOFError;

    2.如果你管道的一端在主進程和子進程中都關閉了,但是你還用這個關閉的一端去接收消息,那麽就會出現OSError;

    所以你關閉管道的時候,就容易出現問題,需要將所有只用這個管道的進程中的兩端全部關閉才行。當然也可以通過異常捕獲(try:except EOFerror)來處理。

    雖然我們在主進程和子進程中都打印了一下conn1一端的對象,發現兩個不再同一個地址,但是子進程中的管道和主進程中的管道還是可以通信的,因為管道是同一套,系統能夠記錄。    

    我們的目的就是關閉所有的管道,那麽主進程和子進程進行通信的時候,可以給子進程傳管道的一端就夠了,並且用我們之前學到的,信息發送完之後,再發送一個結束信號None,那麽你收到的消息為None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中的管道了。

技術分享圖片 技術分享圖片
from multiprocessing import Pipe,Process

def func(conn):
    while True:
        msg = conn.recv()
        if msg is None:break
        print(msg)

if __name__ == ‘__main__‘:
    conn1,conn2 = Pipe()
    p = Process(target=func,args=(conn1,))
    p.start()
    for i in range(10):
        conn2.send(‘約吧‘)
    conn2.send(None)
技術分享圖片

技術分享圖片 技術分享圖片
from multiprocessing import Process,Pipe

def consumer(p,name):
    produce, consume=p
    produce.close()
    while True:
        try:
            baozi=consume.recv()
            print(‘%s 收到包子:%s‘ %(name,baozi))
        except EOFError:
            break

def producer(seq,p):
    produce, consume=p
    consume.close()
    for i in seq:
        produce.send(i)

if __name__ == ‘__main__‘:
    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),‘c1‘))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(produce,consume))

    produce.close()
    consume.close()

    c1.join()
    print(‘主進程‘)
技術分享圖片

    

   

技術分享圖片
關於管道會造成數據不安全問題的官方解釋:
    The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
由Pipe方法返回的兩個連接對象表示管道的兩端。每個連接對象都有send和recv方法(除其他之外)。註意,如果兩個進程(或線程)試圖同時從管道的同一端讀取或寫入數據,那麽管道中的數據可能會損壞。當然,在使用管道的不同端部的過程中不存在損壞風險。
技術分享圖片

技術分享圖片 技術分享圖片
from multiprocessing import Process,Pipe,Lock

def consumer(p,name,lock):
    produce, consume=p
    produce.close()
    while True:
        lock.acquire()
        baozi=consume.recv()
        lock.release()
        if baozi:
            print(‘%s 收到包子:%s‘ %(name,baozi))
        else:
            consume.close()
            break


def producer(p,n):
    produce, consume=p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()

if __name__ == ‘__main__‘:
    produce,consume=Pipe()
    lock = Lock()
    c1=Process(target=consumer,args=((produce,consume),‘c1‘,lock))
    c2=Process(target=consumer,args=((produce,consume),‘c2‘,lock))
    p1=Process(target=producer,args=((produce,consume),10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print(‘主進程‘)
技術分享圖片

    管道可以用於雙工通信,通常利用在客戶端/服務端中使用的請求/響應模型,或者遠程過程調用,就可以使用管道編寫與進程交互的程序,像前面將網絡通信的時候,我們使用了一個叫subprocess的模塊,裏面有個參數是pipe管道,執行系統指令,並通過管道獲取結果。

7.數據共享(了解)

    展望未來,基於消息傳遞的並發編程是大勢所趨

    即便是使用線程,推薦做法也是將程序設計為大量獨立的線程集合

    通過消息隊列交換數據。這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴展到分布式系統中

    進程間應該盡量避免通信,即便需要通信,也應該選擇進程安全的工具來避免加鎖帶來的問題,應該盡量避免使用本節所講的共享數據的方式,以後我們會嘗試使用數據庫來解決進程之間的數據共享問題。

    進程之間數據共享的模塊之一Manager模塊:

技術分享圖片 技術分享圖片
進程間數據是獨立的,可以借助於隊列或管道實現通信,二者都是基於消息傳遞的
雖然進程間數據獨立,但可以通過Manager實現數據共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
技術分享圖片

  多進程共同去處理共享數據的時候,就和我們多進程同時去操作一個文件中的數據是一樣的,不加鎖就會出現錯誤的結果,進程不安全的,所以也需要加鎖

技術分享圖片 技術分享圖片
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操作共享的數據,肯定會出現數據錯亂
        d[‘count‘]-=1

if __name__ == ‘__main__‘:
    lock=Lock()
    with Manager() as m:
        dic=m.dict({‘count‘:100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
技術分享圖片

總結一下,進程之間的通信:隊列、管道、數據共享也算

下面要講的信號量和事件也相當於鎖,也是全局的,所有進程都能拿到這些鎖的狀態,進程之間這些鎖啊信號量啊事件啊等等的通信,其實底層還是socekt,只不過是基於文件的socket通信,而不是跟上面的數據共享啊空間共享啊之類的機制,我們之前學的是基於網絡的socket通信,還記得socket的兩個家族嗎,一個文件的一個網絡的,所以將來如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤,簡單知道一下就可以啦~~~

工作中常用的是鎖,信號量和事件不常用,但是信號量和事件面試的時候會問到,你能知道就行啦~~~

二 進程池和mutiprocess.Poll

  為什麽要有進程池?進程池的概念。

  在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那麽在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麽?首先,創建進程需要消耗時間,銷毀進程(空間,變量,文件信息等等的內容)也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變量等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。就看我們上面的一些代碼例子,你會發現有些程序是不是執行的時候比較慢才出結果,就是這個原因,那麽我們要怎麽做呢?

  在這裏,要給大家介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麽同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果

multiprocess.Poll模塊

  創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據你的並發量,搞成動態增加或減少進程池中的進程數量的操作),不會開啟其他進程,提高操作系統效率,減少空間的占用等。

  概念介紹:

Pool([numprocess  [,initializer [, initargs]]]):創建進程池

技術分享圖片
numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值
initializer:是每個工作進程啟動時要執行的可調用對象,默認為None
initargs:是要傳給initializer的參數組

技術分享圖片 技術分享圖片
p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
‘‘‘需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()‘‘‘

p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。
‘‘‘此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。‘‘‘
    
p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成

P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用
技術分享圖片

技術分享圖片
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數

技術分享圖片 技術分享圖片
import time
from multiprocessing import Pool,Process

#針對range(100)這種參數的
# def func(n):
#     for i in range(3):
#         print(n + 1)

def func(n):
    print(n)
    # 結果:
    #     (1, 2)
    #     alex
def func2(n):
    for i in range(3):
        print(n - 1)
if __name__ == ‘__main__‘:
    #1.進程池的模式
    s1 = time.time()  #我們計算一下開多進程和進程池的執行效率
    poll = Pool(5) #創建含有5個進程的進程池
    # poll.map(func,range(100)) #異步調用進程,開啟100個任務,map自帶join的功能
    poll.map(func,[(1,2),‘alex‘]) #異步調用進程,開啟100個任務,map自帶join的功能
    # poll.map(func2,range(100))  #如果想讓進程池完成不同的任務,可以直接這樣搞
    #map只限於接收一個可叠代的數據類型參數,列表啊,元祖啊等等,如果想做其他的參數之類的操作,需要用後面我們要學的方法。
    # t1 = time.time() - s1
    #
    # #2.多進程的模式
    # s2 = time.time()
    # p_list = []
    # for i in range(100):
    #     p = Process(target=func,args=(i,))
    #     p_list.append(p)
    #     p.start()
    # [pp.join() for pp in p_list]
    # t2 = time.time() - s2
    #
    # print(‘t1>>‘,t1) #結果:0.5146853923797607s 進程池的效率高
    # print(‘t2>>‘,t2) #結果:12.092015027999878s
技術分享圖片

  有一點,map是異步執行的,並且自帶close和join

  一般約定俗成的是進程池中的進程數量為CPU的數量,工作中要看具體情況來考量。

  實際應用代碼示例:

  同步與異步兩種執行方式:

技術分享圖片 技術分享圖片
import os,time
from multiprocessing import Pool

def work(n):
    print(‘%s run‘ %os.getpid())
    time.sleep(1)
    return n**2

if __name__ == ‘__main__‘:
    p=Pool(3) #進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
                                    # 但不管該任務是否存在阻塞,同步調用都會在原地等著
        res_l.append(res)
    print(res_l)
技術分享圖片

技術分享圖片 技術分享圖片
import os
import time
import random
from multiprocessing import Pool

def work(n):
    print(‘%s run‘ %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == ‘__main__‘:
    p=Pool(3) #進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行,並且可以執行不同的任務,傳送任意的參數了。
                                          # 返回結果之後,將結果放入列表,歸還進程,之後再執行新的任務
                                          # 需要註意的是,進程池中的三個進程不會同時開啟或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:如果使用異步提交的任務,主進程需要使用join,等待進程池內任務都處理完,然後可以用get收集結果
    # 否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了
    p.close() #不是關閉進程池,而是結束進程池接收任務,確保沒有新任務再提交過來。
    p.join()   #感知進程池中的任務已經執行結束,只有當沒有新的任務添加進來的時候,才能感知到任務結束了,所以在join之前必須加上close方法
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
技術分享圖片

技術分享圖片 技術分享圖片
#一:使用進程池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)
        # s = res.get() #如果直接用res這個結果對象調用get方法獲取結果的話,這個程序就變成了同步,因為get方法直接就在這裏等著你創建的進程的結果,第一個進程創建了,並且去執行了,那麽get就會等著第一個進程的結果,沒有結果就一直等著,那麽主進程的for循環是無法繼續的,所以你會發現變成了同步的效果
    print("==============================>") #沒有後面的join,或get,則程序整體結束,進程池中的任務還沒來得及全部執行完也都跟著主進程一起結束了

    pool.close() #關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是調用每個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get

#二:使用進程池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法
        print(i)
技術分享圖片

  進程池版的socket並發聊天代碼示例:

技術分享圖片 技術分享圖片
#Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count())
#開啟6個客戶端,會發現2個客戶端處於等待狀態
#在每個進程內查看pid,會發現pid使用為4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((‘127.0.0.1‘,8080))
server.listen(5)

def talk(conn):
    print(‘進程pid: %s‘ %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == ‘__main__‘:
    p=Pool(4)
    while True:
        conn,*_=server.accept()
        p.apply_async(talk,args=(conn,))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
技術分享圖片

技術分享圖片 技術分享圖片
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))


while True:
    msg=input(‘>>: ‘).strip()
    if not msg:continue

    client.send(msg.encode(‘utf-8‘))
    msg=client.recv(1024)
    print(msg.decode(‘utf-8‘))
技術分享圖片

  

  發現:並發開啟多個客戶端,服務端同一時間只有4個不同的pid,只能結束一個客戶端,另外一個客戶端才會進來.

  同時最多和4個人進行聊天,因為進程池中只有4個進程可供調用,那有同學會問,我們這麽多人想同時聊天怎麽辦,又不讓用多進程,進程池也不能開太多的進程,那咋整啊,後面我們會學到多線程,到時候大家就知道了,現在你們先這樣記住就好啦

  然後我們再提一個回調函數

需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通信來拿到返回值,進程池的這個回調也是進程通信的機制完成的。

我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果

  

技術分享圖片 技術分享圖片
import os
from multiprocessing import Pool

def func1(n):
    print(‘func1>>‘,os.getpid())
    print(‘func1‘)
    return n*n

def func2(nn):
    print(‘func2>>‘,os.getpid())
    print(‘func2‘)
    print(nn)
    # import time
    # time.sleep(0.5)
if __name__ == ‘__main__‘:
    print(‘主進程:‘,os.getpid())
    p = Pool(5)
    #args裏面的10給了func1,func1的返回值作為回調函數的參數給了callback對應的函數,不能直接給回調函數直接傳參數,他只能是你任務函數func1的函數的返回值
    # for i in range(10,20): #如果是多個進程來執行任務,那麽當所有子進程將結果給了回調函數之後,回調函數又是在主進程上執行的,那麽就會出現打印結果是同步的效果。我們上面func2裏面註銷的時間模塊打開看看
    #     p.apply_async(func1,args=(i,),callback=func2)
    p.apply_async(func1,args=(10,),callback=func2)

    p.close()
    p.join()

#結果
# 主進程: 11852  #發現回調函數是在主進程中完成的,其實如果是在子進程中完成的,那我們直接將代碼寫在子進程的任務函數func1裏面就行了,對不對,這也是為什麽稱為回調函數的原因。
# func1>> 17332
# func1
# func2>> 11852
# func2
# 100
技術分享圖片

  回調函數在寫的時候註意一點,回調函數的形參執行有一個,如果你的執行函數有多個返回值,那麽也可以被回調函數的這一個形參接收,接收的是一個元祖,包含著你執行函數的所有返回值。

  

  使用進程池來搞爬蟲的時候,最耗時間的是請求地址的網絡請求延遲,那麽如果我們在將處理數據的操作加到每個子進程中,那麽所有在進程池後面排隊的進程就需要等更長的時間才能獲取進程池裏面的執行進程來執行自己,所以一般我們就將請求作成一個執行函數,通過進程池去異步執行,剩下的數據處理的內容放到另外一個進程或者主進程中去執行,將網絡延遲的時間也利用起來,效率更高。

  requests這個模塊的get方法請求頁面,就和我們在瀏覽器上輸入一個網址然後回車去請求別人的網站的效果是一樣的。安裝requests模塊的指令:在cmd窗口執行pip install requests。

技術分享圖片
import requests
response = requests.get(‘http://www.baidu.com‘)
print(response)
print(response.status_code) #200正常,404找不到網頁,503等5開頭的是人家網站內部錯誤
print(response.content.decode(‘utf-8‘))

  

技術分享圖片 技術分享圖片
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print(‘<進程%s> get %s‘ %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {‘url‘:url,‘text‘:respone.text}

def pasrse_page(res):
    print(‘<進程%s> parse %s‘ %(os.getpid(),res[‘url‘]))
    parse_res=‘url:<%s> size:[%s]\n‘ %(res[‘url‘],len(res[‘text‘]))
    with open(‘db.txt‘,‘a‘) as f:
        f.write(parse_res)


if __name__ == ‘__main__‘:
    urls=[
        ‘https://www.baidu.com‘,
        ‘https://www.python.org‘,
        ‘https://www.openstack.org‘,
        ‘https://help.github.com/‘,
        ‘http://www.sina.com.cn/‘
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了

‘‘‘
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{‘url‘: ‘https://www.baidu.com‘, ‘text‘: ‘<!DOCTYPE html>\r\n...‘,...}]
‘‘‘
技術分享圖片

技術分享圖片 技術分享圖片
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            ‘index‘:item[0],
            ‘title‘:item[1],
            ‘actor‘:item[2].strip()[3:],
            ‘time‘:item[3][5:],
            ‘score‘:item[4]+item[5]

        }
        print(dic)
if __name__ == ‘__main__‘:
    pattern1=re.compile(r‘<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<‘,re.S)

    url_dic={
        ‘http://maoyan.com/board/7‘:pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get(‘http://maoyan.com/board/7‘)
    # print(re.findall(pattern,res.text))
技術分享圖片

  如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

技術分享圖片 技術分享圖片
from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == ‘__main__‘:
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中所有進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有結果
    print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理
技術分享圖片

  進程池和信號量的區別:

  進程池是多個需要被執行的任務在進程池外面排隊等待獲取進程對象去執行自己,而信號量是一堆進程等待著去執行一段邏輯代碼。

  信號量不能控制創建多少個進程,但是可以控制同時多少個進程能夠執行,但是進程池能控制你可以創建多少個進程。

  舉例:就像那些開大車拉煤的,信號量是什麽呢,就好比我只有五個車道,你每次只能過5輛車,但是不影響你創建100輛車,但是進程池相當於什麽呢?相當於你只有5輛車,每次5個車拉東西,拉完你再把車放回來,給別的人拉煤用。

  其他語言裏面有更高級的進程池,在設置的時候,可以將進程池中的進程動態的創建出來,當需求增大的時候,就會自動在進程池中添加進程,需求小的時候,自動減少進程,並且可以設置進程數量的上線,最多為多,python裏面沒有。

python D32 管道、線程池