python中multiprocessing、multiprocessing.dummy和threading用法筆記
一、multiprocessing
用法參考地址:multiprocessing用法
首先解釋一個誤區:
程序池的大小是每次同時執行的程序數,但是並不會影響主程序申請程序的數量。主程序申請多程序量不等於池子大小。
1、子程序無返回值
# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' pool = Pool(processes=3) for i in xrange(1, 5): msg = 'hello %d' % (i) pool.apply_async(func,(msg,)) # 非阻塞 # pool.apply(func,(msg,)) # 阻塞,apply()源自內建函式,用於間接的呼叫函式,並且按位置把元祖或字典作為引數傳入。 # pool.imap(func,[msg,]) # 非阻塞, 注意與apply傳的引數的區別 # pool.map(func, [msg, ]) # 阻塞 print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 print 'sub-process done'
- 非阻塞方法
multiprocessing.Pool.apply_async() 和 multiprocessing.Pool.imap()
程序併發執行 - 阻塞方法
multiprocessing.Pool.apply()和 multiprocessing.Pool.map()
程序順序執行
2、子程序有返回值
只有apply_async可以有返回值,apply,map,imap不可以設定返回值.
# -*- coding:utf-8 -*- from multiprocessing import Pool as Pool import time def func(msg): print 'msg:', msg time.sleep(2) print 'end:' return msg pool = Pool(processes=3) result = [] for i in xrange(1, 5): msg = 'hello %d' % (i) res = pool.apply_async(func,(msg,)) # 非阻塞 只有apply_async可以有返回值,apply,map,imap不可以設定返回值 result.append(res) print 'Mark~~~~~~~~~~~~~~~' pool.close() pool.join() # 呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 for res in result: print "sub_process return: ", res.get() print 'sub-process done'
一定要注意res.get()方法是堵塞的。只有子程序執行完畢並返回資料時 res.get()方法才會執行,否則主程序堵塞,並等待。
看下面這個程式: 如何高效處理子程序有返回值的多程序任務
from multiprocessing import Pool import Queue import time def test(p): time.sleep(0.5) if p == 100: return (p,True) else: return (p,False) if __name__ == "__main__": pool = Pool(processes=10) q = Queue.Queue() for i in xrange(500): # 將子程序物件存入佇列中。 q.put( pool.apply_async(test, args=(i,)) ) # 維持執行的程序總數為10,當一個程序執行完後新增新程序. print(i) ''' 因為這裡使用的為pool.apply_async非同步方法,因此子程序執行的過程中,父程序會執行while,獲取返回值並校驗。 ''' print("======", q.qsize()) while 1: a = q.get().get(); print(a) if a[1]: pool.terminate() # 結束程序池中的所有子程序。 break pool.join()
該程式瞬間執行到 print("======", q.qsize()) 行,並且每次執行 a = q.get().get()程式碼時,如果對應程序沒有執行完,即沒有返回輸出值時,該行程式碼導致主程序堵塞等待。
如果需要申請龐大的程序數量時,就會很浪費資源比如下面:
for i in xrange(500000000):
# 將子程序物件存入佇列中。
q.put( pool.apply_async(test, args=(i,)) ) # 維持執行的程序總數為10,當一個程序執行完後新增新程序.
print(i)
我們可以開啟2個執行緒,一個執行緒申請程序,另一個執行緒判斷結束所有子程序的程序是否已經到達。
如下:
from multiprocessing import Pool
import Queue
import threading
import time
def test(p):
time.sleep(0.001)
if p == 10000:
return True
else:
return False
if __name__ == "__main__":
result = Queue.Queue() # 佇列
pool = Pool()
def pool_th():
for i in xrange(50000000000): ##這裡需要建立執行的子程序非常多
try:
result.put(pool.apply_async(test, args=(i,)))
except:
break
def result_th():
while 1:
a = result.get().get() # 獲取子程序返回值
if a:
pool.terminate() # 結束所有子程序
break
'''
利用多執行緒,同時執行Pool函式建立執行子程序,以及執行獲取子程序返回值函式。
'''
t1 = threading.Thread(target=pool_th)
t2 = threading.Thread(target=result_th)
t1.start()
t2.start()
t1.join()
t2.join()
pool.join()
3、多程序共享資源
申請程序有兩種方式一種是multiprocessing.Process(),另一種是multiprocessing.Pool(process=3).apply_async().
multiprocessing提供三種多程序之間共享資料的資料結構: Queue, Array 和Manager.
from multiprocessing import Queue, Array, Manager
Queue、和Array只適用Process類申請的多程序共享資源。
Manager可以適用Pool和Process類申請的多程序共享資源。
import time
from multiprocessing import Manager, Pool
lists = Manager().list() # 定義可被子程序共享的全域性變數lists
def func(i):
# time.sleep(1)
lists.append(i)
print i
pool = Pool(processes=3)
for i in xrange(10000000):
if len(lists) <= 0:
pool.apply_async(func, args=(i,))
else:
break
pool.close()
pool.join()
print(lists)
輸出結果為:且i最大值不定。主程序申請多程序量不等於池子大小。
二、多執行緒 Multiprocessing.dummy
1、子程序無返回值
Multiprocessing.dummy.Pool() 與Multiprocessing.Pool() 的用法一樣
- 非阻塞方法
multiprocessing.dummy.Pool.apply_async() 和 multiprocessing.dummy.Pool.imap()
執行緒併發執行 - 阻塞方法
multiprocessing.dummy.Pool.apply()和 multiprocessing.dummy.Pool.map()
執行緒順序執行
2、子程序有返回值
與多程序一樣,只有multiprocessing.dummy.Pool.apply_async()可以有返回值,apply,map,imap不可以設定返回值.
3、多程序共享資源
三、多執行緒 Threading
1、建立方法
- 直接使用Thread類
from threading import Thread
import time
def run(a = None, b = None) :
print a, b
time.sleep(1)
t = Thread(target = run, args = ("this is a", "thread"))
#此時執行緒是新建狀態
print t.getName()#獲得執行緒物件名稱
print t.isAlive()#判斷執行緒是否還活著。
t.start()#啟動執行緒
t.join()#等待其他執行緒執行結束
- 繼承Thread類
from threading import Thread
import time
class MyThread(Thread) :
def __init__(self, a) :
super(MyThread, self).__init__()
#呼叫父類的構造方法
self.a = a
def run(self) :
print "sleep :", self.a
time.sleep(self.a)
t1 = MyThread(2)
t2 = MyThread(4)
t1.start()
t2.start()
t1.join()
t2.join()