PythonI/O進階學習筆記_11.python的多程序
阿新 • • 發佈:2020-01-23
content:
1. 為什麼要多程序程式設計?和多執行緒有什麼區別?
2. python 多程序程式設計
3. 程序間通訊
=======================================
一. 為什麼要多程序程式設計?和多執行緒有什麼區別?
由於GIL的存在,所以對於某一些多執行緒任務來說,無法利用多核的優勢,對這些耗cpu的任務,用多程序反而能利用多cpu。
所以多cpu的操作用多程序程式設計。
對io操作較多的任務來說,瓶頸不在於cpu,更多的在於io的切換中的消耗和時間等待。用多執行緒反而能在io掛起的時候,進行執行緒切換。
雖然io操作多的時候,也可以用多程序程式設計,但是因為程序的切換系統的代價是十分大的,所以能使用多執行緒的情況下,儘量用多執行緒。
所以,對於耗費cpu的操作,比如計算、挖礦等,多程序優於多執行緒。
例:同計算一組斐波拉契數列的時間比較(耗cpu的操作)
#多執行緒 from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def fib(n): if n <= 2: return 1 return fib(n-1)+fib(n-2) with ThreadPoolExecutor(3) as excutor: all_task=[excutor.submit(fib,(num)) for num in range(25,35)] start_time=time.time() for future in as_completed(all_task): data=future.result() print("result:{}".format(data)) end_time=time.time() print("last time : {}".format(end_time-start_time)) #output: result:75025 result:121393 result:196418 result:317811 result:514229 result:832040 result:1346269 result:2178309 result:3524578 result:5702887 last time : 98.66604399681091
#多程序 from concurrent.futures import ThreadPoolExecutor,as_completed from concurrent.futures import ProcessPoolExecutor import time def fib(n): if n <= 2: return 1 return fib(n-1)+fib(n-2) if __name__ == "__main__": with ProcessPoolExecutor(3) as excutor: all_task = [excutor.submit(fib, (num)) for num in range(25, 35)] start_time = time.time() for future in as_completed(all_task): data = future.result() print("result:{}".format(data)) end_time = time.time() print("last time : {}".format(end_time - start_time)) #output: result:75025 result:121393 result:196418 result:317811 result:514229 result:832040 result:1346269 result:2178309 result:3524578 result:5702887 last time : 14.470988988876343
程序和執行緒的區別:
- 程序是資源分配的最小單位,執行緒是程式執行的最小單位。
- 程序有自己的獨立地址空間,每啟動一個程序,系統就會為它分配地址空間,建立資料表來維護程式碼段、堆疊段和資料段,這種操作非常昂貴。而執行緒是共享程序中的資料的,使用相同的地址空間,因此CPU切換一個執行緒的花費遠比程序要小很多,同時建立一個執行緒的開銷也比程序要小很多。
- 執行緒之間的通訊更方便,同一程序下的執行緒共享全域性變數、靜態變數等資料,而程序之間的通訊需要以通訊的方式(IPC)進行。不過如何處理好同步與互斥是編寫多執行緒程式的難點。
- 但是多程序程式更健壯,多執行緒程式只要有一個執行緒死掉,整個程序也死掉了,而一個程序死掉並不會對另外一個程序造成影響,因為程序有自己獨立的地址空間。
#input from concurrent.futures import ProcessPoolExecutor import multiprocessing #多程序程式設計 import time def get_html(n): time.sleep(n) print("sub_progress sccess") if __name__=="__main__": progress = multiprocessing.Process(target=get_html,args=(3,)) print(progress.pid) progress.start() print(progress.pid) progress.join() #output None 12864 sub_progress sccess
3.繼承Progress類(與之前的Thread類一樣)
import multiprocessing #多程序程式設計 import time class progress_get_html(multiprocessing.Process): def __init__(self,n): self.n=n super().__init__() def run(self): time.sleep(self.n) print("sub progress success") class MyProgress(multiprocessing.Process): def __init__(self,n): self.n=n super().__init__() def run(self): pro=progress_get_html(self.n) pro.start() print("progress end") if __name__=="__main__": progress = MyProgress(3) print(progress.pid) progress.start() print(progress.pid) progress.join() #output: None 8744 progress end sub progress success
4.使用程序池 指明程序數,不指明的話,可以直接預設為cpu數(cpu_count() or 1)。
from concurrent.futures import ProcessPoolExecutor from multiprocessing import pool import multiprocessing #多程序程式設計 import time def get_html(n): time.sleep(n) print("sub_progress sccess") return n if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) result=pool.apply_async(get_html,args=(3,)) print(result.get()) #pool在呼叫join之前 需要呼叫close 來讓它不再接收任務。否則會報錯 pool.close() pool.join() print(result.get()) #output sub_progress sccess 3 3
其他方法: - imap:按照引數輸入順序
if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap(get_html,[1,5,3]): print("sleep {} successed ".format(result)) #output: sub_progress sccess sleep 1 successed sub_progress sccess sub_progress sccess sleep 5 successed sleep 3 successedimap_unordered: 按照執行完成順序
if __name__=="__main__": pool=multiprocessing.Pool(multiprocessing.cpu_count()) #for result in pool.imap(get_html,[1,5,3]): # print("sleep {} successed ".format(result)) for result in pool.imap_unordered(get_html,[1,5,3]): print("sleep {} successed ".format(result)) #output: sub_progress sccess sleep 1 successed sub_progress sccess sleep 3 successed sub_progress sccess sleep 5 successed
三. 程序間通訊 與執行緒間不同的是,執行緒間同步的類和鎖是不可用的。 1.Queue(注意是multiprocessing而不是thread的)
from multiprocessing import Process,Queue import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if __name__== "__main__": queue=Queue(10) my_producer = Process(target=producer,args=(queue,)) my_consumer = Process(target=consumer,args=(queue,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() #outpu: a注意:multprocess中的Queue是不能用於pool程序池的 2.Manager(與程序池共用) Manager中有個Queue,如果像實現pool中的程序間通訊,需要使用Manager中的Queue。
from multiprocessing import Process,pool,Manager,Pool import time def producer(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if __name__== "__main__": queue=Manager().Queue() pool=Pool(3) pool.apply_async(producer,args=(queue,)) pool.apply_async(consumer,args=(queue,)) pool.close() pool.join() #output: a3.管道pipe pipe只能適用於兩個指定的程序。 pipe的效能高於queue的,queue加了很多的鎖操作。
from multiprocessing import Process,pool,Manager,Pool,Pipe import time def producer(pipe): pipe.send("hello") def consumer(pipe): print(pipe.recv()) if __name__== "__main__": recv_pipe,send_pipe=Pipe() my_producer=Process(target=producer,args=(send_pipe,)) my_consumer=Process(target=consumer,args=(recv_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join() #output: hello
4.程序間共享記憶體操作 Mnager的dict、list、value等。
from multiprocessing import Process,pool,Manager,Pool,Pipe import time def add_data(p_dict,key,value): p_dict[key]=value if __name__ == "__main__": progress_dict= Manager().dict() first_progress= Process(target=add_data,args=(progress_dict,"name","tangrong")) second_progress = Process(target=add_data,args=(progress_dict,"age","18")) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print(progress_dict) #output: {'name': 'tangrong', 'age': '18'}在使用的時候,可以用Manager中的資料結構,但是注意資料同步(LOCK,RLOCK等)