1. 程式人生 > 實用技巧 >python 多程序詳解(Multiprocessing模組)

python 多程序詳解(Multiprocessing模組)

python 多程序(MultiProcess)

1.Process

建立程序的類Process([group [, target [, name [, args [, kwargs]]]]]),target表示呼叫物件,args表示呼叫物件的位置引數元組,kwargs表示呼叫物件的字典,name為別名,group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,start()啟動某個程序。join(timeout),使主調程序阻塞,直至被呼叫子程序執行結束或超時(如指定timeout)。
屬性:authkey、daemon(要通過start()設定)、exitcode(程序在執行時為None、如果為–N,表示被訊號N結束)、name、pid。其中daemon是父程序終止後自動終止,且自己不能產生新程序,必須在start()之前設定。

1.1 建立函式並作為單個程序

import multiprocessing
import time


def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.start()
    print("p.pid:", p.pid)
    print("p.name:", p.name)
    print("p.is_alive:", p.is_alive())

結果

p.pid: 2460
p.name: Process-1
p.is_alive: True
The time is Tue Aug  4 17:31:02 2020
The time is Tue Aug  4 17:31:05 2020
The time is Tue Aug  4 17:31:08 2020
The time is Tue Aug  4 17:31:11 2020
The time is Tue Aug  4 17:31:14 2020

1.2 建立函式並作為多個程序

import multiprocessing
import time

def worker_1(interval):
    print("worker_1")
    time.sleep(interval)
    print("end worker_1")

def worker_2(interval):
    print("worker_2")
    time.sleep(interval)
    print("end worker_2")

def worker_3(interval):
    print("worker_3")
    time.sleep(interval)
    print("end worker_3")

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print("END!!!!!!!!!!!!!!!!!")

結果

The number of CPU is:8
child   p.name:Process-2	p.id2897
child   p.name:Process-1	p.id2896
child   p.name:Process-3	p.id2898
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3

1.3 daemon程式對比結果(不加daemon屬性)

import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print("end!")

結果

end!
work start:Tue Aug  4 17:37:02 2020
work end:Tue Aug  4 17:37:05 2020

1.4 daemon程式對比(設定daemon為True)

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    print("end!")

結果

end!

子程序設定了daemon屬性,主程序結束,它們就隨著結束了。

1.5 設定daemon執行完結束的方法

import multiprocessing
import time


def worker(interval):
    print("work start:{0}".format(time.ctime()))
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()))


if __name__ == "__main__":
    p = multiprocessing.Process(target=worker, args=(3,))
    p.daemon = True
    p.start()
    p.join()
    print("end!")

結果

work start:Tue Aug  4 17:41:11 2020
work end:Tue Aug  4 17:41:14 2020
end!

2.Lock

當多個程序需要訪問共享資源的時候,Lock可以用來避免訪問的衝突

import multiprocessing
import sys


def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()


def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()


if __name__ == "__main__":
    lock = multiprocessing.Lock()  # 宣告一個鎖
    f = "file.txt"
    w = multiprocessing.Process(target=worker_with, args=(lock, f))
    nw = multiprocessing.Process(target=worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

結果

Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with

3.Semaphore

訊號量Semaphore是一個計數器,控制對公共資源或者臨界區域的訪問量,訊號量可以指定同時訪問資源或者進入臨界區域的程序數。每次有一個程序獲得訊號量時,計數器-1,若計數器為0時,其他程序就停止訪問訊號量,一直阻塞直到其他程序釋放訊號量。
常用方法和屬性
acquire(blocking = True, timeout=None)
請求一個訊號量
release()
釋放一個訊號量

import time, random
from multiprocessing import Process, Semaphore

def ktv(i, sem):
    sem.acquire()
    print('%s 走進ktv' %i)
    time.sleep(random.randint(1, 5))
    print('%s 走出ktv' %i)
    sem.release()

if __name__ == "__main__":
    sem = Semaphore(4)
    for i in range(5):
        p = Process(target=ktv, args=(i, sem))
        p.start()

結果

0 走進ktv
3 走進ktv
4 走進ktv
1 走進ktv
1 走出ktv
2 走進ktv
2 走出ktv
0 走出ktv
3 走出ktv
4 走出ktv

4.Event

Python 多程序中 Event 是用來實現程序間同步通訊的(當然多執行緒中也可以用event)。事件event執行的機制是:全域性定義了一個Flag,如果Flag值為 False,當程式執行event.wait()方法時就會阻塞,如果Flag值為True時,程式執行event.wait()方法時不會阻塞繼續執行。
常用方法和屬性
wait(time)方法:等待 time 時間後,執行下一步。或者在呼叫 event.set() 後立即執行下一步。
set()方法:將Flag的值改成True。
clear()方法:將Flag的值改成False。
is_set()方法:判斷當前的Flag的值。

import time
import random
from multiprocessing import Process
from multiprocessing import Event


def now():
    return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))


def traffic_light(e):  # 紅綠燈
    print(now() + ' \033[31m紅燈亮\033[0m')  # Flag 預設是False
    while True:
        if e.is_set():  # 如果是綠燈
            time.sleep(2)  # 2秒後
            print(now() + ' \033[31m紅燈亮\033[0m')  # 轉為紅燈
            e.clear()  # 設定為False

        else:  # 如果是紅燈
            time.sleep(2)  # 2秒後
            print(now() + ' \033[32m綠燈亮\033[0m')  # 轉為綠燈
            e.set()  # 設定為True

def people(e, i):
    if not e.is_set():
        print(now() + ' people %s 在等待' % i)
        e.wait()
        print(now() + ' people %s 通過了' % i)


if __name__ == '__main__':
    e = Event()  # 預設為 False,紅燈亮
    p = Process(target=traffic_light, args=(e,))  # 紅綠燈程序
    p.daemon = True
    p.start()
    process_list = []
    for i in range(6):  # 6人過馬路
        time.sleep(random.randrange(0, 4, 2))
        p = Process(target=people, args=(e, i))
        p.start()
        process_list.append(p)

    for p in process_list:
        p.join()

結果

2020-08-04 17:53:34 紅燈亮
2020-08-04 17:53:36 people 0 在等待
2020-08-04 17:53:36 綠燈亮
2020-08-04 17:53:36 people 0 通過了
2020-08-04 17:53:38 紅燈亮
2020-08-04 17:53:39 people 3 在等待
2020-08-04 17:53:40 people 2 在等待
2020-08-04 17:53:40 綠燈亮
2020-08-04 17:53:40 people 2 通過了
2020-08-04 17:53:40 people 3 通過了

5.Queue

常用方法和屬性
Queue([maxsize])
建立共享的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖定實現
q.get( [ block [ ,timeout ] ] )
返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。
block用於控制阻塞行為,預設為True阻塞程序. 如果設定為False,不阻塞但將引發Queue.Empty異常(定義在Queue模組中)。
timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。
q.put(item [, block [,timeout ] ] )
將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,預設為True阻塞。如果設定為False,不阻塞但將引發Queue.Empty異常(定義在Queue庫模組中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。

# encoding: utf-8

import os
import time
from multiprocessing import Queue, Process, freeze_support


def inputQ(queue):
    info = str(os.getpid()) + "(put):" + str(time.asctime())
    queue.put(info)


def outputQ(queue):
    info = queue.get()
    print('%s%s \033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))


if __name__ == '__main__':
    freeze_support()
    record1 = []  # store input process
    record2 = []  # stroe output process
    queue = Queue(3)

    # 輸入程序
    for i in range(10):
        process = Process(target=inputQ, args=(queue,))
        process.start()
        record1.append(process)
    # 輸出程序
    for i in range(10):
        process = Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()
    for p in record2:
        p.join()

結果

7647(get): 7641(put):Tue Aug  4 17:56:15 2020
7649(get): 7637(put):Tue Aug  4 17:56:15 2020
7648(get): 7639(put):Tue Aug  4 17:56:15 2020
7646(get): 7636(put):Tue Aug  4 17:56:15 2020
7651(get): 7642(put):Tue Aug  4 17:56:15 2020
7650(get): 7640(put):Tue Aug  4 17:56:15 2020
7654(get): 7638(put):Tue Aug  4 17:56:15 2020
7652(get): 7643(put):Tue Aug  4 17:56:15 2020
7655(get): 7645(put):Tue Aug  4 17:56:15 2020
7653(get): 7644(put):Tue Aug  4 17:56:15 2020

6.Pipe

Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex引數,如果duplex引數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受訊息,conn2只負責傳送訊息。
send和recv方法分別是傳送和接受訊息的方法。例如,在全雙工模式下,可以呼叫conn1.send傳送訊息,conn1.recv接收訊息。如果沒有訊息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會丟擲EOFError。

import multiprocessing
import time


def proc1(pipe):
    while True:
        for i in range(10000):
            print("send: %s" % (i))
            pipe.send(i)
            time.sleep(1)


def proc2(pipe):
    while True:
        print("proc2 rev:", pipe.recv())
        time.sleep(1)





if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

結果

send: 0
proc2 rev: 0
send: 1
proc2 rev: 1
send: 2
proc2 rev: 2程序池是多個需要被執行的任務在程序池外面排隊等待獲取程序物件去執行自己, 而訊號量是一堆程序等待著去執行一段邏輯程式碼.

訊號量不能控制建立多少個程序, 但是可以控制同時多少個程序能夠執行.
程序池能控制可以建立多少個程序.
send: 3
proc2 rev: 3
send: 4
proc2 rev: 4
send: 5
proc2 rev: 5
send: 6
proc2 rev: 6
...

7.Pool

在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個目標,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。
Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。

程序池是多個需要被執行的任務在程序池外面排隊等待獲取程序物件去執行自己, 而訊號量是一堆程序等待著去執行一段邏輯程式碼.
訊號量不能控制建立多少個程序, 但是可以控制同時多少個程序能夠執行.
程序池能控制可以建立多少個程序.

7.1 非阻塞

# coding: utf-8
import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        # 維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
        pool.apply_async(func, (msg, ))

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束
    print("Sub-process(es) done.")

結果

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
msg: hello 3
end
Sub-process(es) done.

7.2 阻塞

#coding: utf-8
import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=3)
    for i in range(4):
        msg = "hello %d" % (i)
        pool.apply(func, (msg, ))  # 維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去

    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()  # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束
    print("Sub-process(es) done.")

結果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

7.3 使用程序池,關注結果

import multiprocessing
import time


def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in range(3):
        msg = "hello %d" % (i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print("Sub-process(es) done.")

結果

msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.

8.資料共享

程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的。
雖然程序間資料獨立,但可以通過Manager實現資料共享,事實上Manager的功能遠不止於此。

8.1 list

# -*-encoding:utf-8-*-
from multiprocessing import Process, Manager
from time import sleep


def thread_a_main(sync_data_pool):  # A 程序主函式,存入100+的數
    for ix in range(100, 105):
        sleep(1)
        sync_data_pool.append(ix)


def thread_b_main(sync_data_pool):  # B 程序主函式,存入300+的數
    for ix in range(300, 309):
        sleep(0.6)
        sync_data_pool.append(ix)


def _test_case_000():  # 測試用例
    manager = Manager()  # multiprocessing 中的 Manager 是一個工廠方法,直接獲取一個 SyncManager 的例項
    sync_data_pool = manager.list()  # 利用 SyncManager 的例項來建立同步資料池
    Process(target=thread_a_main, args=(
        sync_data_pool, )).start()  # 建立並啟動 A 程序
    Process(target=thread_b_main, args=(
        sync_data_pool, )).start()  # 建立並啟動 B 程序
    for ix in range(6):  # C 程序(主程序)中實時的去檢視資料池中的資料
        sleep(1)
        print(sync_data_pool)


if '__main__' == __name__:  # 將測試用例單獨列出
    _test_case_000()

結果

[300]
[300, 100, 301, 302]
[300, 100, 301, 302, 101, 303]
[300, 100, 301, 302, 101, 303, 304, 102, 305]
[300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307]
[300, 100, 301, 302, 101, 303, 304, 102, 305, 103, 306, 307, 104, 308]

8.2 dict

from multiprocessing import Manager, Lock, Process
import time


def worker(d, key, value):
    print(key, value)
    d[key] = value


if __name__ == '__main__':
    mgr = Manager()
    d = mgr.dict()
    jobs = [Process(
        target=worker, args=(d, i, i*2)) for i in range(10)]

    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:')
    print(d)

結果

6 12
7 14
9 18
0 0
8 16
2 4
5 10
1 2
3 6
4 8
Results:
{6: 12, 7: 14, 9: 18, 0: 0, 8: 16, 2: 4, 5: 10, 1: 2, 3: 6, 4: 8}