1. 程式人生 > 實用技巧 >multiprocessing多程序模組

multiprocessing多程序模組

multiprocessing多程序模組

前言

  其實multiprocessing模組與threading模組的介面都非常相似,但是有一些地方有一些細微差別。所以本文是基於前面的threading模組的一些知識對multiprocessing模組進行講解的。

  他們的主要區別有以下幾點

  1.建立子程序的方式針對不同平臺有著差異化

  2.關於守護執行緒的設定介面是setDaemon(True),而關於守護程序的介面是deamon = True

  3.multiprocessing模組下的獲取程序名與設定程序名沒有threading模組下的getName()setName()

,而是直接採取屬性name進行操作

  4.多程序中資料共享不能使用普通的queue模組下提供的佇列進行資料共享,而應使用multiprocessing中提供的Queue

  5.multiprocessing模組下中提供的Queue先進先出佇列沒有task_done()join(),他們都在JoinableQueue中,並且該模組下沒有提供LifoQueue後進先出佇列與PriorityQueue優先順序佇列

  官方中文文件

  threading模組基本使用

多程序與多執行緒工作的區別

多執行緒工作方式


  多執行緒的工作方式實際上在第一篇的時候,我們已經說過了。因為執行緒必須存在於程序之中,是最小的執行單元,所以你可以將它如此理解:

  其實就是不斷的往程序這個小房間加人,那麼它的優點如下:

  開一條新的執行緒比開一條新的程序開銷要小很多

  並且對於執行緒的切換來說代價也要小很多

  多條執行緒共有該程序下的所有資源,資料共享比較容易實現

  而CPython由於GIL鎖的設定,所以它的多執行緒是殘缺不全的,因此在很多時候我們依然要用到多程序,雖然這種情況比較少。

多程序工作方式


  其實就是不斷的造出一模一樣的小房間,那麼它的優點如下:

  雖然說,新開一條程序比新開一條執行緒的代價大很多,但是由於CPython中GIL鎖的設定想在多執行緒的情況下實現並行是不可能的,只有多程序才能夠實現並行。

  可以說是唯一優點了,但是我們依然要學習一下multiprocessing模組,它的學習代價並不是很大,所以接下來正式進入multiprocessing模組的學習。

基本使用

針對不同平臺的程序啟動方式


  對於程序啟動方式來說,其實multiprocessing模組中對於不同平臺下有不同的啟動方式。如下:

  spawn:這玩意兒相當於建立了一個新的直譯器程序,對比其他兩種方法,這種方法速度上比較慢,但是它是Windows平臺下預設的啟動方式(Unix系統下可用)。並且在windows平臺下,我們應該在if __name__ == '__main__'下進行新程序的啟動。但是我依然認為不管在哪個平臺下不論執行緒還是程序都應該在if __name__ == '__main__'這條語句下啟動。

  fork:這種啟動方式是通過os.fork()來產生一個新的直譯器分叉,是Unix系統的預設啟動方式。

  forkserver:這個我也看不太明白,直接把官方文件搬過來。如果有懂的大神可以解釋一下。

  程式啟動並選擇forkserver 啟動方法時,將啟動伺服器程序。從那時起,每當需要一個新程序時,父程序就會連線到伺服器並請求它分叉一個新程序。分叉伺服器程序是單執行緒的,因此使用 os.fork() 是安全的。沒有不必要的資源被繼承。可在Unix平臺上使用,支援通過Unix管道傳遞檔案描述符。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__': # <--- 強烈注意!在windows平臺下開多程序一定要在該條語句之下,否則會丟擲異常!!
    mp.set_start_method('spawn')  # 選擇啟動方式
    q = mp.Queue() # 例項化出用於程序間資料共享的管道
    p = mp.Process(target=foo, args=(q,))  
    p.start() # 啟動程序任務,等待CPU排程執行
    print(q.get())  # 從管道中拿出資料
    p.join()  # 阻塞至子程序執行完畢

例項化Process類建立子程序


  其實感覺上面的方法都已經將本章要寫的內容舉例了一個七七八八,但是我們接著往下看。與threading模組中建立多執行緒的方式一樣,multiprocessing模組建立多程序的方式也有兩種,所以我們將之前的示例拿過來直接改一改就好。

import multiprocessing
import time

print("主程序任務開始處理")


def task(th_name):
    print("子程序任務開始處理,引數:{0}".format(th_name))
    time.sleep(3)  
    print("子程序任務處理完畢")


if __name__ == '__main__':  # <--- Windows平臺下必須在該條語句下執行

    # ==== 例項化出Process類並新增子程序任務以及引數 ====

    p1 = multiprocessing.Process(target=task, args=("程序[1]",))  # <-- 引數必須新增逗號。因為是args所以會打散,如果不加逗號則不能進行打散會丟擲異常
    p1.start()  # 等待CPU排程..請注意這裡不是立即執行

    print("主程序任務處理完畢")

# ==== 執行結果 ====

"""
主程序任務開始處理
主程序任務處理完畢
主程序任務開始處理
子程序任務開始處理,引數:程序[1]
子程序任務處理完畢
"""

  這裡我們看執行結果,主程序任務開始處理列印了兩次,而主程序任務處理完畢列印了一次,這是為什麼呢?由於我們是在Windows平臺下,所以它預設的程序啟動方式為spawn,即建立了一個新的直譯器程序並開始執行,所以上面的主程序任務開始處理就列印了兩次,一次是主程序,一次是新建立的子程序。而下面由於if __name__ == '__main__':這條語句,子程序並不會執行該語句下面的程式碼塊,所以主程序任務處理完畢就只打印了一次。

自定義類繼承Process並覆寫run方法


import multiprocessing
import time

print("主程序任務開始處理")


class Processing(multiprocessing.Process):
    """自定義類"""

    def __init__(self, th_name):
        self.th_name = th_name
        super(Processing, self).__init__()

    def run(self):
        print("子程序任務開始處理,引數:{0}".format(self.th_name))
        time.sleep(3)
        print("子程序任務處理完畢")


if __name__ == '__main__':
    p1 = Processing("程序[1]")
    p1.start()  # 等待CPU排程..請注意這裡不是立即執行

    print("主程序任務處理完畢")

# ==== 執行結果 ====

"""
主程序任務開始處理
主程序任務處理完畢
主程序任務開始處理
子程序任務開始處理,引數:程序[1]
子程序任務處理完畢
"""

multiprocessing方法大全

  multiprocessing模組中的方法參考了thrading模組中的方法。但是我們一般用下面兩個方法就夠了,他們都可以拿到具體的程序物件。

multiprocessing模組方法大全
方法/屬性名稱 功能描述
multiprocessing.active_children() 檢視當前程序存活了的所有子程序物件,以列表形式返回。
multiprocessing.current_process() 獲取當前程序物件。

程序物件方法大全

程序物件方法大全(即Process類的例項物件)
方法/屬性名稱 功能描述
start() 啟動程序,該方法不會立即執行,而是告訴CPU自己準備好了,可以隨時排程,而非立即啟動。
run() 一般是自定義類繼承Process類並覆寫的方法,即執行緒的詳細任務邏輯。
join(timeout=None) 主程序預設會等待子程序執行結束後再繼續執行,timeout為等待的秒數,如不設定該引數則一直等待。
name 可以通過 = 給該程序設定一個通俗的名字。如直接使用該屬性則返回該程序的預設名字。
is_alive() 檢視程序是否存活,返回布林值。
daemon 可以通過 = 給該程序設定一個守護程序。如直接使用該屬性則是檢視程序是否為一個守護程序,返回布林值。預設為False
pid 返回程序ID。在生成該程序之前,這將是 None
exitcode 子程序的退出程式碼。如果程序尚未終止,這將是 None 。負值 -N 表示子程序被訊號 N 終止。
authkey 程序的身份驗證金鑰(位元組字串)。
sentinel 系統物件的數字控制代碼,當程序結束時將變為 "ready" 。
terminate() 終止程序。
kill() 同上
close() 關閉 Process 物件,釋放與之關聯的所有資源。如果底層程序仍在執行,則會引發 ValueError 。一旦 close() 成功返回, Process 物件的大多數其他方法和屬性將引發 ValueError
注意 start()join()is_alive()terminate()exitcode 方法只能由建立程序物件的程序呼叫。

程序物件的好夥伴(即Process類的例項物件)
os.getpid() 返回程序ID。

與threading模組的介面異同

守護程序daemon


import multiprocessing
import time

print("主程序任務開始處理")


def task(th_name):
    print("子程序任務開始處理,引數:{0}".format(th_name))
    time.sleep(3)
    print("子程序任務處理完畢")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task, args=("程序[1]",))

    p1.daemon = True  # <-- 設定程序物件p1為守護程序,注意這一步一定要放在start之前。
    p1.start()  # 等待CPU排程..請注意這裡不是立即執行
    
    time.sleep(2)
    
    print("主程序任務處理完畢")

# ==== 執行結果 ====  #  print("子程序任務處理完畢") 可以看到該句沒有執行
 
"""
主程序任務開始處理
主程序任務開始處理
子程序任務開始處理,引數:程序[1]
主程序任務處理完畢
"""

設定與獲取程序名


import multiprocessing
import time

print("主程序任務開始處理")


def task(th_name):
    print("子程序任務開始處理,引數:{0}".format(th_name))
    obj  =  multiprocessing.current_process()  # 獲取當前程序物件
    print("獲取當前的程序名:{0}".format(obj.name))
    print("開始設定程序名")
    obj.name = "yyy"
    print("獲取修改後的程序名:{0}".format(obj.name))
    time.sleep(3)  
    print("子程序任務處理完畢")


if __name__ == '__main__':
    # ==== 第一步:例項化出Process類並新增子程序任務以及引數 ====

    t1 = multiprocessing.Process(target=task, args=("程序[1]",),name="xxx")
    t1.start()  # 等待CPU排程..請注意這裡不是立即執行

    print("主程序名:",multiprocessing.current_process().name)  # 直接使用屬性 name
    print("主程序任務處理完畢")

# ==== 執行結果 ====

"""
主程序任務開始處理
主程序名: MainProcess
主程序任務處理完畢
主程序任務開始處理
子程序任務開始處理,引數:程序[1]
獲取當前的程序名:xxx
開始設定程序名
獲取修改後的程序名:yyy
子程序任務處理完畢
"""

鎖相關演示

  鎖的使用和threading模組中鎖的使用相同,所以我們舉例一個Lock鎖即可。

import multiprocessing

lock = multiprocessing.Lock()  # 例項化同步鎖物件  # 注意!!! 在Windows平臺下,我們應該將鎖的例項化放在上面,這樣子程序才能拿到鎖物件。否則就會丟擲異常!!!或者也可以將鎖物件傳入當做形參進行傳入,二者選其一

num = 0

def add():
    lock.acquire()  # 上鎖
    global num
    for i in range(10000000):  # 一千萬次
        num += 1
    lock.release()  # 解鎖


def sub():
    lock.acquire()  # 上鎖
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1
    lock.release()  # 解鎖


if __name__ == '__main__':


    t1 = multiprocessing.Process(target=add, )
    t2 = multiprocessing.Process(target=sub, )
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    print("最終結果:", num)

# ==== 執行結果 ==== 三次採集

"""
最終結果: 0
最終結果: 0
最終結果: 0
"""
from multiprocessing import Process, Lock
​
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
​
if __name__ == '__main__':
    lock = Lock() # 將鎖例項化後傳入
for num in range(10):
        Process(target=f, args=(lock, num)).start()
將鎖當做引數傳入

三種程序資料共享的方式

multiprocessing.Queue


  這裡一定要使用multiprocessing中的Queue,如果你想用佇列中的task_done()join()方法,你應該匯入JoinableQueue這個佇列。

multiprocessing.Queue方法大全
方法名稱 功能描述
Queue.qsize() 返回當前佇列的大小
Queue.empty() 判斷當前佇列是否為空
Queue.full() 判斷當前佇列是否已滿
Queue.put(item, block=True, timeout=None) item放入佇列中,block引數為如果要操作的佇列目前已滿是否阻塞,timeout為超時時間。
Queue.put_nowait(item) 相當於 put(item, False),如果操作的佇列已滿則不進行阻塞,而是丟擲Full異常。
Queue.get(block=True, timeout=None) 將專案從佇列中取出,block引數為如果要操作的佇列目前為空是否阻塞,timeout為超時時間。
Queue.get_nowait() 相當於 get(False),如果要操作的佇列為空則不進行阻塞,而是丟擲Empty異常。
Queue.close() 指示當前程序將不會再往佇列中放入物件。一旦所有緩衝區中的資料被寫入管道之後,後臺的執行緒會退出。這個方法在佇列被gc回收時會自動呼叫。
Queue.join_thread() 等待後臺執行緒。這個方法僅在呼叫了 close() 方法之後可用。這會阻塞當前程序,直到後臺執行緒退出,確保所有緩衝區中的資料都被寫入管道中。
Queue.cancel_join_thread() 防止 join_thread() 方法阻塞當前程序。具體而言,這防止程序退出時自動等待後臺執行緒退出。詳見 join_thread()

  程序佇列multiprocessing.Queue不同於執行緒佇列queue.Queue,程序佇列的消耗和底層實現比執行緒佇列的要複雜許多。還是因為各程序之間不能共享任何資料,所以只能通過對映的方式來傳遞資料。程序佇列multiprocessing.Queue作為資料安全型別的資料結構,放在多程序中做通訊使用是非常合適的,但是同時它的消耗也是非常大的,能不使用則儘量不要使用。

import time
import multiprocessing
from multiprocessing import Queue,JoinableQueue


def task_1(q):
    print("正在裝東西..")
    time.sleep(3)
    q.put("玫瑰花")  # 正在裝東西
    q.task_done()  # 通知對方可以取了


def task_2(q):
    q.join() # 阻塞等待通知,接到通知說明佇列裡裡有東西了。
    print("取到了",q.get())  # 取東西


if __name__ == '__main__':

    q = JoinableQueue(maxsize=5)  # 例項化佇列

    t1 = multiprocessing.Process(target=task_1,args=(q,),name="小明")  # 將佇列傳進子程序任務中
    t2 = multiprocessing.Process(target=task_2,args=(q,),name="小花")

    t1.start()
    t2.start()

# ==== 執行結果 ====

"""
正在裝東西..
取到了 玫瑰花
"""
程序佇列Queue實現程序間的資料共享

  什麼執行緒佇列queue.Queue不能做到程序間資料共享呢,這是因為程序佇列multiprocessing.Queue會採取一種對映的方式來同步資料,所以說程序佇列的資源消耗比執行緒佇列要龐大很多。執行緒中所有資訊共享,所以執行緒佇列根本不需要對映關係。程序佇列只是告訴你可以這樣使用它達到程序間的資料共享,但是並不推薦你濫用它。

multiprocessing.Pipe


  除開使用程序佇列來實現程序間的通訊,multiprocessing還提供了Pipe管道來進行通訊。他的資源消耗較少並且使用便捷,但是唯一的缺點便是只支援點對點

  Pipe有點類似socket通訊。但是比socket通訊更加簡單,它不需要去做字串處理位元組,先來看一個例項:

import multiprocessing
from multiprocessing import Pipe

def task_1(conn1):
    conn1.send("hello,我是task1")
    print(conn1.recv())

def task_2(conn2):
    print(conn2.recv())
    conn2.send("我收到了,我是task2")

if __name__ == '__main__':
    conn1,conn2 = Pipe()  # 建立兩個電話
    p1 = multiprocessing.Process(target=task_1,args=(conn1,))  # 一人一部電話
    p2 = multiprocessing.Process(target=task_2,args=(conn2,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

# ==== 執行結果 ====

"""
hello,我是task1
我收到了,我是task2
"""
Pipe實現程序間的資料共享

multiprocessing.Mangaer


  除了程序佇列multiprocessing.Queue,管道Pipemultiprocessing還提供了Manager作為共享變數來提供使用,但是這種方式是不應該被直接使用的因為它本身相較於程序佇列Queue是資料不安全的。當多個程序同時修改一個共享變數勢必導致結果出現問題,所以要想使用共享變數還得使用multiprocessin提供的程序鎖才行。

  Manager類是資料不安全的;

  Mangaer類支援的型別非常多,如:value, Array, List, Dict, Queue(程序池通訊專用), Lock等。

  Mangaer實現了上下文管理器,可使用with語句建立多個物件。具體使用方法我們來看一下:

  

import multiprocessing
from multiprocessing import Manager

def task_1(dic):
    dic["task_1"] = "大帥哥"

def task_2(dic):
    dic["task_2"] = "大美女"
    print(dic.get("task_1"))

if __name__ == '__main__':
    with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!!
        dic = m.dict()  # 例項化出了一個字典,除此之外還有很多其他的資料型別

        p1 = multiprocessing.Process(target=task_1,args=(dic,))  # 將字典傳進來
        p2 = multiprocessing.Process(target=task_2,args=(dic,))

        p1.start()  # 啟動一定要放在with之後
        p2.start()

        p1.join()
        p2.join()

# ==== 執行結果 ====

"""
大帥哥
"""
Manager實現程序間的資料共享
import multiprocessing
from multiprocessing import Manager

def task_1(dic):
    for i in range(1000):
        dic["count"] += 1

def task_2(dic):
    for i in range(1000):
        dic["count"] -= 1

if __name__ == '__main__':
    with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!!
        dic = m.dict({"count":0})  # 例項化出了一個字典,除此之外還有很多其他的資料型別

        p1 = multiprocessing.Process(target=task_1,args=(dic,)) # 傳字典
        p2 = multiprocessing.Process(target=task_2,args=(dic,))
        p1.start()
        p2.start()

        p1.join()
        p2.join()
        print(dic)

# ==== 執行結果 ====

"""
{'count': -23}
"""
程序安全問題
import multiprocessing
from multiprocessing import Manager
from multiprocessing import RLock

def task_1(dic,lock):
    with lock:
        for i in range(1000):
            dic["count"] += 1

def task_2(dic,lock):
    with lock:
        for i in range(1000):
            dic["count"] -= 1

if __name__ == '__main__':

    lock = RLock() # 例項化鎖

    with Manager() as m: # !!!!! 注意 !!!!!!! 如果對 Manager()中的資料型別進行頻繁的操作,而程序又特別多的時候,請使用 Rlock 鎖進行處理,這有可能引發執行緒不安全!!!
        dic = m.dict({"count":0})  # 例項化出了一個字典,除此之外還有很多其他的資料型別

        p1 = multiprocessing.Process(target=task_1,args=(dic,lock,))  # 傳字典,傳鎖
        p2 = multiprocessing.Process(target=task_2,args=(dic,lock,))
        p1.start()
        p2.start()

        p1.join()
        p2.join()
        print(dic)

# ==== 執行結果 ====

"""
{'count': 0}
"""
實用Rlock鎖解決程序安全問題