python --- mulitprocessing(多進程)模塊使用
1. 什麽是進程?
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。在早期面向進程設計的計算機結構中,進程是程序的基本執行實體;在當代面向線程設計的計算機結構中,進程是線程的容器。
一個進程至少包含一個線程。
2. 在python中有了多線程編程為何還需要多進程編程?
在python中由於有GIL(全局解釋器鎖)的存在,在任一時刻只有一個線程在運行(無論你的CPU是多少核),無法實現真正的多線程。那麽該如何讓python程序真正的並行運行呢?答案就是不要使用多線程,使用多進程。python標準庫提供了multiprocessing模塊(multiprocessing
是一個和threading
模塊類似,提供API,生成進程的模塊。multiprocessing
包提供本地和遠程並發,通過使用子進程而不是線程有效地轉移全局解釋器鎖。),它的API幾乎復制了threading模塊的API,當然它還有一行threading模塊沒有的API。
例一(multiprocessing模塊的簡單使用):
1 import multiprocessing,time 2 3 class Task(multiprocessing.Process): 4 def __init__(self): 5 super(Task, self).__init__View Code() 6 7 def run(self): 8 print("Process---%s" % self.name) 9 time.sleep(2) 10 11 12 if __name__ == "__main__": 13 for i in range(1, 8+1): 14 t = Task() 15 t.start()
註:由於multiprocessing模塊基本的API同threading模塊,就不挨個演示了,本文主要講解multiprocessing模塊不同於threading模塊的API的使用。要了解其他同threading模塊相同的API的使用,可參見:http://www.cnblogs.com/God-Li/p/7732407.html
multiprocessing.Process源碼:
class Process(object): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.name = ‘‘ self.daemon = False #守護進程標誌,必須在start()之前設置 self.authkey = None #The process’s authentication key (a byte string). self.exitcode = None #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N. self.ident = 0 self.pid = 0 #進程ID。在生成進程之前,這將是Non。 self.sentinel = None #A numeric handle of a system object which will become “ready” when the process ends. def run(self): pass def start(self): pass def terminate(self): """ Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned. :return: """ pass def join(self, timeout=None): pass def is_alive(self): return False
multiprocessing模塊中的隊列:
class multiprocessing.
Queue
([maxsize])實現除task_done()
和join()
之外的queue.Queue
的所有方法,下面列出queue.Queue中沒有的方法:
class multiprocessing.Queue([maxsize]) close() """ 指示當前進程不會在此隊列上放置更多數據。 The background thread will quit once it has flushed all buffered data to the pipe. 當隊列被垃圾回收時,這被自動調用。 """ join_thread() """ 加入後臺線程。這只能在調用close()之後使用。它阻塞直到後臺線程退出,確保緩沖區中的所有數據都已刷新到pipe。 默認情況下,如果進程不是隊列的創建者,那麽在退出時它將嘗試加入隊列的後臺線程。 該進程可以調用cancel_join_thread()使join_thread()不執行任何操作 """ cancel_join_thread() """ 使join_thread()不執行任何操作 """
class multiprocessing.
SimpleQueue是class
multiprocessing.
Queue
([maxsize])的簡化,只有三個方法------empty(), get(), put()
class multiprocessing.
JoinableQueue
([maxsize])是class multiprocessing.
Queue
([maxsize])的子類,增加了take_done()和join()方法
註:由於進程之間內存空間不共享,所以必須將實例化後的queue對象當作參數傳入其他進程,其他進程才能使用。而且,每傳入一次相當於克隆一份,與原來的queue獨立,只是python會同步queue中的數據,而不是像在多線程的queue數據只有一份。
進程之間的通信:
multiprocessing.
Pipe
([duplex]) --------------- 返回表示管道末端的Connection
對象(類似與socket中的連接可用於發送和接收數據)的(conn1, conn2)。
如果duplex是True
(默認值),則管道是雙向的。如果duplex是False
,則管道是單向的:conn1
只能用於接收消息,conn2
用於發送消息。
例二(multiprocessing.Pipe使用演示):
1 import multiprocessing,time 2 3 class Processing_1(multiprocessing.Process): 4 def __init__(self, conn): 5 super(Processing_1, self).__init__() 6 self.conn = conn 7 def run(self): 8 send_data = "this message is from p1" 9 self.conn.send(send_data) #使用conn發送數據 10 time.sleep(0.8) 11 recv_data = self.conn.recv() #使用conn接收數據 12 print("p1 recv: " + recv_data) 13 self.conn.close() 14 15 16 class Processing_2(multiprocessing.Process): 17 def __init__(self, conn): 18 super(Processing_2, self).__init__() 19 self.conn = conn 20 21 def run(self): 22 send_data = "this message is from p2" 23 self.conn.send(send_data) 24 time.sleep(0.8) 25 recv_data = self.conn.recv() 26 print("p2 recv: " + recv_data) 27 self.conn.close() 28 29 if __name__ == "__main__": 30 conn1, conn2 = multiprocessing.Pipe() #實例化Pipe對象,conn1, conn2分別代表連接兩端 31 32 p1 = Processing_1(conn1) #將連接對象當作參數傳遞給子進程 33 p2 = Processing_2(conn2) 34 35 p1.start() 36 p2.start() 37 38 p1.join() 39 p2.join()multiprocessing.Pipe使用演示
進程之間的數據共享:
multiprocessing.
Manager
() ----------- 返回開始的SyncManager
對象,可用於在進程之間共享對象。返回的管理器對象對應於生成的子進程,並且具有將創建共享對象並返回相應代理的方法。管理器進程將在垃圾收集或其父進程退出時立即關閉。
例三(Manager的簡單使用):
1 import multiprocessing,time 2 import os 3 4 class Processing(multiprocessing.Process): 5 def __init__(self, d, l): 6 super(Processing, self).__init__() 7 self.d = d 8 self.l = l 9 10 def run(self): 11 self.d[os.getpid()] = os.getpid() #當作正常dict使用即可 12 self.l.append(1) 13 print(self.l) 14 15 if __name__ == "__main__": 16 17 manager = multiprocessing.Manager() #生成Manager 對象 18 d = manager.dict() #生成共享dict 19 l = manager.list() #生成共享list 20 21 p_s = [] 22 for i in range(10): 23 p = Processing(d, l) 24 p.start() 25 p_s.append(p) 26 27 for p in p_s: 28 p.join() 29 30 print(d) 31 print(l)Manager簡單使用
manager可以生成以下共享數據對象(常用):
Event
()-
Create a shared
threading.Event
object and return a proxy for it.
Lock
()-
Create a shared
threading.Lock
object and return a proxy for it.
Namespace
()-
Create a shared
Namespace
object and return a proxy for it.
Queue
([maxsize])-
Create a shared
queue.Queue
object and return a proxy for it.
RLock
()-
Create a shared
threading.RLock
object and return a proxy for it.
Semaphore
([value])-
Create a shared
threading.Semaphore
object and return a proxy for it.
Array
(typecode, sequence)-
Create an array and return a proxy for it.
Value
(typecode, value)?-
Create an object with a writable
value
attribute and return a proxy for it.
dict
()dict
(mapping)dict
(sequence)-
Create a shared
dict
object and return a proxy for it.
list
()list
(sequence)-
Create a shared
list
object and return a proxy for it.
進程鎖:
進程鎖有兩種multiprocessing.
Lock(非遞歸鎖)和
multiprocessing.
RLock(遞歸鎖)。
multiprocessing.
Lock(非遞歸鎖):一旦進程或線程獲得了鎖,隨後從任何進程或線程獲取它的嘗試將阻塞,直到它被釋放;任何進程或線程都可以釋放它。
multiprocessing.
RLock(遞歸鎖): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.
這兩種鎖都只用兩種方法:acquire
(block=True, timeout=None)和release
(),它們的使用基本和線程鎖類似(只不是要把鎖的示例對象當作參數傳入其他的進程):http://www.cnblogs.com/God-Li/p/7732407.html
進程池:
為了便於對多進程的管理,通常使用進程池來進行多進程編程(而不是使用multiprocessing.Process)。
例:
1 import multiprocessing,os 2 import time 3 4 5 def run(): 6 print(str(os.getpid()) + "-----running") 7 time.sleep(2) 8 print(str(os.getpid()) + "-----done") 9 10 def done(): 11 print("done") 12 13 def error(): 14 print("error") 15 16 if __name__ == "__main__": 17 pool = multiprocessing.Pool(processes=4) #實力化進程池對象 18 19 for i in range(8): 20 # pool.apply(func=run) #進程池中的進程串行運行 21 pool.apply_async(func=run) 22 23 pool.close() 24 pool.join() 25 print("finish....")View Code
Pool對象常用方法:
apply
(func[, args[, kwds]])-
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks,
apply_async()
is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.將任務提交到進程池,只有一個進程在工作,其他進程處於阻塞狀態(相當於串行運行)。
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])-
A variant of the
apply()
method which returns a result object.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.
If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.
Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.
將任務提交到進程池,多個進程(進程數量由之前實例化時的processes參數設置)同時運行,callback工作進程完成時(由當前進程的父進程)調用由此傳入的任務,error_callback工作進程出錯時(由當前進程的父進程)調用由此傳入的任務。
close
()-
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
調用此方法後進程池不能在提交新的任務
terminate
()-
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected
terminate()
will be called immediately.立即停止工作進程,而不需要等待未完成的工作進程。
join
()-
Wait for the worker processes to exit. One must call
close()
orterminate()
before usingjoin()
.等待進程池中的工作進程結束(在此之前必須調用close()或者terminate())。
註:Pool對象在生成時進程內的進程(阻塞)就已經啟動,使用apply(或者apply_async)方法只是將任務提交給線程池,不會再建立新進程。
python --- mulitprocessing(多進程)模塊使用