Python多程序併發(multiprocessing)
阿新 • • 發佈:2018-12-27
由於Python下呼叫Linux的Shell命令都需要等待返回,所以常常我們設定的多執行緒都達不到效果,
因此在呼叫shell命令不需要返回時,使用threading模組並不是最好的方法。
Python提供了非常好用的多程序包multiprocessing,你只需要定義一個函式,Python會替你完成其他所有事情。
藉助這個包,可以輕鬆完成從單程序到併發執行的轉換。
1、新建單一程序
如果我們新建少量程序,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."
2、使用程序池
是的,你沒有看錯,不是執行緒池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多併發程序數量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."
3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多程序執行,還需要關注每個程序的執行結果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根據網友評論中的反饋,在Windows下執行有可能崩潰(開啟了一大堆新視窗、程序),可以通過如下呼叫來解決:
multiprocessing.freeze_support()
附錄(自己的指令碼):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用程序池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #新增執行緒到執行緒列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #建立多執行緒任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待執行緒完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break