python使用多程序
阿新 • • 發佈:2018-12-10
python多執行緒適合IO密集型場景,而在CPU密集型場景,並不能充分利用多核CPU,而協程本質基於執行緒,同樣不能充分發揮多核的優勢。
針對計算密集型場景需要使用多程序,python的multiprocessing與threading模組非常相似,支援用程序池的方式批量建立子程序。
-
建立單個Process程序(使用func)
只需要例項化Process類,傳遞函式給target引數,這點和threading模組非常的類似,args為函式的引數
import os from multiprocessing import Process # 子程序要執行的程式碼 def task(name): print('run child process %s (%s)...' % (name, os.getpid())) if __name__ == '__main__': print('parent process %s.' % os.getpid()) p = Process(target=task, args=('test',)) p.start() p.join() print('process end.')
-
建立單個Process程序(使用class)
繼承Process類,重寫run方法建立程序,這點和threading模組基本一樣
import multiprocessing import os from multiprocessing import current_process class Worker(multiprocessing.Process): def run(self): name = current_process().name # 獲取當前程序的名稱 print('run child process <%s> (%s)' % (name, os.getpid())) print('In %s' % self.name) return if __name__ == '__main__': print('parent process %s.' % os.getpid()) p = Worker() p.start() p.join() print('process end.')
* 停止程序
terminate()結束子程序,但是會導致子程序的資源無法釋放掉,是不推薦的做法,因為結束的時候不清楚子執行緒的執行狀況,有很大可能性導致子執行緒在不恰當的時刻被結束。
import multiprocessing import time def worker(): print('starting worker') time.sleep(0.1) print('finished worker') if __name__ == '__main__': p = multiprocessing.Process(target=worker) print('執行前:', p.is_alive()) p.start() print('執行中:', p.is_alive()) p.terminate() # 傳送停止號 print('停止:', p.is_alive()) p.join() print('等待完成:', p.is_alive())
-
直接建立多個Process程序
import multiprocessing def worker(num): print(f'Worker:%s %s', num) return if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
-
使用程序池建立多個程序
在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個目標,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。Pool可以提供指定數量的程序供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。
import os import random import time from multiprocessing import Pool from time import ctime def task(name): print('start task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) print('end task %s runs %0.2f seconds.' % (name, (time.time() - start))) if __name__ == '__main__': print('parent process %s.' % os.getpid()) p = Pool() # 初始化程序池 for i in range(5): p.apply_async(task, args=(i,)) # 追加任務 apply_async 是非同步非阻塞的,就是不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換。 p.close() p.join() # 等待所有結果執行完畢,會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close() print(f'all done at: {ctime()}')
如果關心每個程序的執行結果,可以使用返回結果的get方法獲取,程式碼如下
import os import random import time from multiprocessing import Pool, current_process from time import ctime def task(name): print('start task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) print('end task %s runs %0.2f seconds.' % (name, (time.time() - start))) return current_process().name + 'done' if __name__ == '__main__': print('parent process %s.' % os.getpid()) result = [] p = Pool() # 初始化程序池 for i in range(5): result.append(p.apply_async(task, args=(i,))) # 追加任務 apply_async 是非同步非阻塞的,就是不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換。 p.close() p.join() # 等待所有結果執行完畢 for res in result: print(res.get()) # get()函式得出每個返回結果的值 print(f'all done at: {ctime()}')