1. 程式人生 > >python --- mulitprocessing(多進程)模塊使用

python --- mulitprocessing(多進程)模塊使用

ive 垃圾 cname exitcode 多線程 tasks function 空間 win

1. 什麽是進程?

  進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。

一個進程至少包含一個線程。

2. 在python中有了多線程編程為何還需要多進程編程?

  在python中由於有GIL(全局解釋器鎖)的存在,在任一時刻只有一個線程在運行(無論你的CPU是多少核),無法實現真正的多線程。那麽該如何讓python程序真正的並行運行呢?答案就是不要使用多線程,使用多進程。python標準庫提供了multiprocessing模塊(multiprocessing

是一個和threading模塊類似,提供API,生成進程的模塊。multiprocessing包提供本地和遠程並發,通過使用子進程而不是線程有效地轉移全局解釋器鎖。),它的API幾乎復制了threading模塊的API,當然它還有一行threading模塊沒有的API。

例一(multiprocessing模塊的簡單使用):

技術分享
 1 import multiprocessing,time
 2 
 3 class Task(multiprocessing.Process):
 4     def __init__(self):
 5         super(Task, self).__init__
() 6 7 def run(self): 8 print("Process---%s" % self.name) 9 time.sleep(2) 10 11 12 if __name__ == "__main__": 13 for i in range(1, 8+1): 14 t = Task() 15 t.start()
View Code

  註:由於multiprocessing模塊基本的API同threading模塊,就不挨個演示了,本文主要講解multiprocessing模塊不同於threading模塊的API的使用。要了解其他同threading模塊相同的API的使用,可參見:http://www.cnblogs.com/God-Li/p/7732407.html

multiprocessing.Process源碼:

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        self.name = ‘‘
        self.daemon = False      #守護進程標誌,必須在start()之前設置
        self.authkey = None      #The process’s authentication key (a byte string).
        self.exitcode = None     #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.
        self.ident = 0
        self.pid = 0          #進程ID。在生成進程之前,這將是Non。
        self.sentinel = None   #A numeric handle of a system object which will become “ready” when the process ends.

    def run(self):
        pass

    def start(self):
        pass

    def terminate(self):
        """
        Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. 
        Note that exit handlers and finally clauses, etc., will not be executed.
        Note that descendant processes of the process will not be terminated – they will simply become orphaned.
        :return: 
        """
        pass

    def join(self, timeout=None):
        pass

    def is_alive(self):
        return False

multiprocessing模塊中的隊列:

  class multiprocessing.Queue([maxsize])實現除task_done()join()之外的queue.Queue的所有方法,下面列出queue.Queue中沒有的方法:

class multiprocessing.Queue([maxsize])
    close()
    """
    指示當前進程不會在此隊列上放置更多數據。
    The background thread will quit once it has flushed all buffered data to the pipe.
    當隊列被垃圾回收時,這被自動調用。
    """

    join_thread()
    """
    加入後臺線程。這只能在調用close()之後使用。它阻塞直到後臺線程退出,確保緩沖區中的所有數據都已刷新到pipe。
    默認情況下,如果進程不是隊列的創建者,那麽在退出時它將嘗試加入隊列的後臺線程。
    該進程可以調用cancel_join_thread()使join_thread()不執行任何操作
    """

    cancel_join_thread()
    """
    使join_thread()不執行任何操作
    """

  class multiprocessing.SimpleQueue是class multiprocessing.Queue([maxsize])的簡化,只有三個方法------empty(), get(), put()

   class multiprocessing.JoinableQueue([maxsize])是class multiprocessing.Queue([maxsize])的子類,增加了take_done()和join()方法

   註:由於進程之間內存空間不共享,所以必須將實例化後的queue對象當作參數傳入其他進程,其他進程才能使用。而且,每傳入一次相當於克隆一份,與原來的queue獨立,只是python會同步queue中的數據,而不是像在多線程的queue數據只有一份。

進程之間的通信:

  multiprocessing.Pipe([duplex]) --------------- 返回表示管道末端的Connection對象(類似與socket中的連接可用於發送和接收數據)的(conn1, conn2)。

    如果duplex是True(默認值),則管道是雙向的。如果duplex是False,則管道是單向的:conn1只能用於接收消息,conn2用於發送消息。

  例二(multiprocessing.Pipe使用演示):

技術分享
 1 import multiprocessing,time
 2 
 3 class Processing_1(multiprocessing.Process):
 4     def __init__(self, conn):
 5         super(Processing_1, self).__init__()
 6         self.conn = conn
 7     def run(self):
 8         send_data = "this message is from p1"
 9         self.conn.send(send_data)   #使用conn發送數據
10         time.sleep(0.8)
11         recv_data = self.conn.recv() #使用conn接收數據
12         print("p1 recv: " + recv_data)
13         self.conn.close()
14 
15 
16 class Processing_2(multiprocessing.Process):
17     def __init__(self, conn):
18         super(Processing_2, self).__init__()
19         self.conn = conn
20 
21     def run(self):
22         send_data = "this message is from p2"
23         self.conn.send(send_data)
24         time.sleep(0.8)
25         recv_data = self.conn.recv()
26         print("p2 recv: " + recv_data)
27         self.conn.close()
28 
29 if __name__ == "__main__":
30     conn1, conn2 = multiprocessing.Pipe()   #實例化Pipe對象,conn1, conn2分別代表連接兩端
31 
32     p1 = Processing_1(conn1)   #將連接對象當作參數傳遞給子進程
33     p2 = Processing_2(conn2)
34 
35     p1.start()
36     p2.start()
37 
38     p1.join()
39     p2.join()
multiprocessing.Pipe使用演示

進程之間的數據共享:

  multiprocessing.Manager() ----------- 返回開始的SyncManager對象,可用於在進程之間共享對象。返回的管理器對象對應於生成的子進程,並且具有將創建共享對象並返回相應代理的方法。管理器進程將在垃圾收集或其父進程退出時立即關閉。

  例三(Manager的簡單使用):

技術分享
 1 import multiprocessing,time
 2 import os
 3 
 4 class Processing(multiprocessing.Process):
 5     def __init__(self, d, l):
 6         super(Processing, self).__init__()
 7         self.d = d
 8         self.l = l
 9 
10     def run(self):
11         self.d[os.getpid()] = os.getpid()    #當作正常dict使用即可
12         self.l.append(1)
13         print(self.l)
14 
15 if __name__ == "__main__":
16 
17     manager = multiprocessing.Manager()   #生成Manager 對象
18     d = manager.dict()   #生成共享dict
19     l = manager.list()   #生成共享list
20 
21     p_s = []
22     for i in range(10):
23         p = Processing(d, l)
24         p.start()
25         p_s.append(p)
26 
27     for p in p_s:
28         p.join()
29 
30     print(d)
31     print(l)
Manager簡單使用

  manager可以生成以下共享數據對象(常用):

Event()

Create a shared threading.Event object and return a proxy for it.

Lock()

Create a shared threading.Lock object and return a proxy for it.

Namespace()

Create a shared Namespace object and return a proxy for it.

Queue([maxsize])

Create a shared queue.Queue object and return a proxy for it.

RLock()

Create a shared threading.RLock object and return a proxy for it.

Semaphore([value])

Create a shared threading.Semaphore object and return a proxy for it.

Array(typecode, sequence)

Create an array and return a proxy for it.

Value(typecode, value)?

Create an object with a writable value attribute and return a proxy for it.

dict()
dict(mapping)
dict(sequence)

Create a shared dict object and return a proxy for it.

list()
list(sequence)

Create a shared list object and return a proxy for it.

進程鎖:

  進程鎖有兩種multiprocessing.Lock(非遞歸鎖)和multiprocessing.RLock(遞歸鎖)。

  multiprocessing.Lock(非遞歸鎖):一旦進程或線程獲得了鎖,隨後從任何進程或線程獲取它的嘗試將阻塞,直到它被釋放;任何進程或線程都可以釋放它。

  multiprocessing.RLock(遞歸鎖): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.

  這兩種鎖都只用兩種方法:acquire(block=True, timeout=None)和release(),它們的使用基本和線程鎖類似(只不是要把鎖的示例對象當作參數傳入其他的進程):http://www.cnblogs.com/God-Li/p/7732407.html

進程池:

  為了便於對多進程的管理,通常使用進程池來進行多進程編程(而不是使用multiprocessing.Process)。

  例:

技術分享
 1 import multiprocessing,os
 2 import time
 3 
 4 
 5 def run():
 6     print(str(os.getpid()) + "-----running")
 7     time.sleep(2)
 8     print(str(os.getpid()) + "-----done")
 9 
10 def done():
11     print("done")
12 
13 def error():
14     print("error")
15 
16 if __name__ == "__main__":
17     pool = multiprocessing.Pool(processes=4)    #實力化進程池對象
18 
19     for i in range(8):
20         # pool.apply(func=run)      #進程池中的進程串行運行
21         pool.apply_async(func=run)
22 
23     pool.close()
24     pool.join()
25     print("finish....")
View Code

  Pool對象常用方法:

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

將任務提交到進程池,只有一個進程在工作,其他進程處於阻塞狀態(相當於串行運行)。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

將任務提交到進程池,多個進程(進程數量由之前實例化時的processes參數設置)同時運行,callback工作進程完成時(由當前進程的父進程)調用由此傳入的任務,error_callback工作進程出錯時(由當前進程的父進程)調用由此傳入的任務。

close()

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

調用此方法後進程池不能在提交新的任務

terminate()

Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.

立即停止工作進程,而不需要等待未完成的工作進程。

join()

Wait for the worker processes to exit. One must call close() or terminate() before using join().

等待進程池中的工作進程結束(在此之前必須調用close()或者terminate())。

註:Pool對象在生成時進程內的進程(阻塞)就已經啟動,使用apply(或者apply_async)方法只是將任務提交給線程池,不會再建立新進程。

python --- mulitprocessing(多進程)模塊使用