python-->多程序
pythn 多程序
python中的多執行緒無法利用多核優勢,若想要充分地使用多核CPU的資源(os.cpu_count()檢視),在python中大部分情況需要使用多程序。Python提供了multiprocessing模組用來開啟子程序,並在子程序中執行我們定製的任務(比如函式),該模組與多執行緒模組threading的程式設計介面類似。multiprocessing模組的功能眾多:支援子程序、通訊和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。同時需要注意的是:程序與執行緒不同,程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內。
ps:詳細理論:http://www.cnblogs.com/linhaifeng/articles/7430066.html
Process類
建立程序的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,表示一個子程序中的任務(尚未啟動)。需要使用關鍵字的方式來指定引數:args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號View Code
引數介紹
group引數未使用,值始終為None target表示呼叫物件,即子程序要執行的任務 args表示呼叫物件的位置引數元組,args=(1,2,'egon',) kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18} name為子程序的名稱
方法介紹
p.start():啟動程序,並呼叫該子程序中的p.run() p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法 p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖 p.is_alive():如果p仍然執行,返回True p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序
屬性介紹
p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
p.name:程序的名稱
p.pid:程序的pid
p.exitcode:程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可)
p.authkey:程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功
程序呼叫
在windows中Process()必須放到# if __name__ == '__main__':下,由於Windows沒有fork,多處理模組啟動一個新的Python程序並匯入呼叫模組。 如果在匯入時呼叫Process(),那麼這將啟動無限繼承的新程序(或直到機器耗盡資源)。 這是隱藏對Process()內部呼叫的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在匯入時被呼叫
import time import random from multiprocessing import Process def test(name): print('%s piaoing' %name) time.sleep(random.randrange(1,5)) print('%s piao end' %name) #必須加,號 pA=Process(target=test,args=('jack',)) pB=Process(target=test,args=('tom',)) pC=Process(target=test,args=('sun',)) pD=Process(target=test,args=('rua',)) pA.start() pB.start() pC.start() pD.start() print('主程序...')方法一
import time import random from multiprocessing import Process class Test(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print('%s piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s piao end' %self.name) pA=Piao('jack') pB=Piao('tom') pC=Piao('sun') pD=Piao('rua') #start會自動呼叫run pA.start() pB.start() pC.start() pD.start() print('主程序...')方法二
from multiprocessing import Process #在windows系統中應該把全域性變數定義在if __name__ == '__main__'之上就可以了 n=100 def work(): global n n=0 print('子程序內...: ',n) if __name__ == '__main__': p=Process(target=work) p.start() print('主程序內...: ',n)程序記憶體空間隔離
join()方法
from multiprocessing import Process import time import random def Test(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) pA=Process(target=piao,args=('jack',)) pB=Process(target=piao,args=('tom',)) pC=Process(target=piao,args=('sun',)) pD=Process(target=piao,args=('rua',)) pA.start() pB.start() pC.start() pD.start() ''' 疑問? 既然join是等待程序結束,那麼按照下面程式碼的寫法,程序不就是序列的了嗎?事實並不是這樣的,首先必須明確:p.join()是讓誰等?很明顯p.join()是讓主執行緒等待p的結束,卡住的是主執行緒而絕非程序p,所以程序只要start就會在開始運行了,所以pA-pD.start()時,系統中已經有四個併發的程序了而pA.join()是在等pA結束,沒錯pA只要不結束主執行緒就會一直卡在原地等待,這也是問題的關鍵即join是讓主執行緒等,而pA-pD仍然是併發執的,pA.joi的時候,其餘pB,pC,pD仍然在執行,等pA.join結束,可能pB,pC,pD早已經結束了,pB.join,pC.join.pD.join直接通過檢測,無需等待所以4個join花費的總時間仍然是耗費時間最長的那個程序執行的時間 ''' pA.join() pB.join() pC.join() pD.join() print('主執行緒....') ''' 啟動程序與join程序可以簡寫為 p_l=[p1,p2,p3,p4] for p in p_l: p.start() for p in p_l: p.join() ''' from multiprocessing import Process import time import random class Test(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Test('egon') p.start() #等待p停止,等0.0001秒就不再等了 p.join(0.0001) print('開始...')View Code
守護程序
主程序建立守護程序p.daemon=True方法,守護程序會在主程序程式碼執行結束後就終止。守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
ps:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止
from multiprocessing import Process import time import random class Test(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Test('World') #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行 p.daemon=True p.start() print('mian processing...') 主程序程式碼執行完畢,守護程序就會結束 from multiprocessing import Process from threading import Thread import time def foo(): print("startfoo...") time.sleep(1) print("endfoo...") def bar(): print("startbar...") time.sleep(3) print("endbar...") pA=Process(target=foo) pb=Process(target=bar) pA.daemon=True pA.start() pB.start() print("main processing...")View Code
程序同步(鎖)
程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
*併發執行,效率高,但競爭同一列印終端,出現列印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start() *併發變成了序列,犧牲了執行效率,避免錯誤 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()View Code
佇列
程序彼此之間互相隔離,要實現程序間通訊(IPC),可以使用multiprocessing模組佇列,這種方式都是使用訊息傳遞的,建立佇列的類(底層就是以管道和鎖定的方式實現
佇列方法
q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():呼叫此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中又加入了專案。
q.full():呼叫此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果佇列中的專案被取走。
q.qsize():返回佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) print(q.get()) print(q.get()) print(q.get()) print(q.empty())View Code
生產者與消費者
在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。
為什麼要使用生產者和消費者模式?
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什麼是生產者消費者模式?
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
基於佇列實現生產者消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c=Process(target=consumer,args=(q,)) #開始 p.start() c.start() print('主程序...') 存在問題:主程序永遠不會結束,因為生產者p在生產完後就結束,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。 解決方式是讓生產者在生產完畢後,往佇列中再發一個結束訊號,使得消費者在接收到結束訊號後可以break出死迴圈。 解決方法: JoinableQueue([maxsize]):與Queue物件相似,但其允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。 引數介紹: maxsize是佇列中允許最大項數,省略則無大小限制。 方法介紹: JoinableQueue的例項p除了與Queue物件相同的方法之外還具有: q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常 q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止 from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) #向q.join()傳送一次訊號,證明一個數據已經被取走了 q.task_done() def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() if __name__ == '__main__': q=JoinableQueue() #生產者 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主程序...')
資料共享
程序間資料是獨立的,但是可以藉助於佇列或管道(不推薦使用)實現通訊,它們都是基於訊息傳遞的。雖然程序間資料獨立,但也可以通過Manager實現資料共享
from multiprocessing import Manager,Process,Lock import os def work(d,lock): # with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_list=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_list.append(p) p.start() for p in p_list: p.join() print(dic)View Code
訊號量
互斥鎖同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料,雖然訊號量與程序池的概念很像,但訊號量涉及到加鎖的概念,所以應該區別對待
from multiprocessing import Process,Semaphore import time,random def go_wc(sem,user): sem.acquire() print('%s 佔到一個座位' %user) #模擬每個人坐座位的速度,0代表有的人坐下就起來 time.sleep(random.randint(0,3)) sem.release() if __name__ == '__main__': sem=Semaphore(5) p_list=[] for i in range(13): p=Process(target=go_wc,args=(sem,'user%s' %i,)) p.start() p_list.append(p) for i in p_list: i.join() print('--------------->>')View Code
事件
python程序程的事件用於主執行緒控制其他執行緒的執行,事件主要提供了三個方法 set、wait、clear。事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
- clear:將“Flag”設定為False
- set:將“Flag”設定為True
from multiprocessing import Process,Event import time,random def car(e,n): while True: if not e.is_set(): #Flase print('\033[31m紅燈亮\033[0m,car%s等著' %n) e.wait() print('\033[32m車%s 看見綠燈亮了\033[0m' %n) time.sleep(random.randint(3,6)) if not e.is_set(): continue print('走你,car', n) break def police_car(e,n): while True: if not e.is_set(): print('\033[31m紅燈亮\033[0m,car%s等著' % n) e.wait(1) print('燈的是%s,警車走了,car %s' %(e.is_set(),n)) break def traffic_lights(e,inverval): while True: time.sleep(inverval) if e.is_set(): e.clear() #e.is_set() ---->False else: e.set() if __name__ == '__main__': e=Event() # for i in range(10): # p=Process(target=car,args=(e,i,)) # p.start() for i in range(5): p = Process(target=police_car, args=(e, i,)) p.start() t=Process(target=traffic_lights,args=(e,10)) t.start() print('---------------->>')View Code
程序池
建立程序池
Pool([numprocess [,initializer [, initargs]]]) 引數介紹: numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None initargs:是要傳給initializer的引數組 主要方法 p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async() p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。 p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成 P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
程序池應用
同步呼叫apply from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務 p=Pool(3) res_l=[] for i in range(10): #同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞,但不管該任務是否存在阻塞,同步呼叫都會在原地等著,只是等的過程中若是任務發生了阻塞就會被奪走cpu的執行許可權 res=p.apply(work,args=(i,)) res_l.append(res) print(res_l) 非同步呼叫 apply_async from multiprocessing import Pool import os,time def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務 p=Pool(3) res_l=[] for i in range(10): #同步執行,阻塞、直到本次任務執行完畢拿到res res=p.apply_async(work,args=(i,)) res_l.append(res) #非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用jion,等待程序池內任務都處理完,然後可以用get收集結果,否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了 p.close() p.join() for res in res_l: #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get print(res.get()) 同步呼叫與非同步呼叫 *使用程序池(非同步呼叫,apply_async) #coding: utf-8 from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply_async(func, (msg, )) #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去 res_l.append(res) print("==============================>") #沒有後面的join,或get,則程式整體結束,程序池中的任務還沒來得及全部執行完也都跟著主程序一起結束了 pool.close() #關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成 pool.join() #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>物件組成的列表,而非最終的結果,但這一步是在join後執行的,證明結果已經計算完畢,剩下的事情就是呼叫每個物件下的get方法去獲取結果 for i in res_l: print(i.get()) #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get *使用程序池(同步呼叫,apply) from multiprocessing import Process,Pool import time def func(msg): print( "msg:", msg) time.sleep(0.1) return msg if __name__ == "__main__": pool = Pool(processes = 3) res_l=[] for i in range(10): msg = "hello %d" %(i) res=pool.apply(func, (msg, )) #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去 res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另外一個 print("==============================>") pool.close() pool.join() #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 print(res_l) #看到的就是最終的結果組成的列表 for i in res_l: #apply是同步的,所以直接得到結果,沒有get()方法 print(i)
回撥函式
需求場景:程序池中任何一個任務一旦處理完了則立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式,可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果
from multiprocessing import Pool import requests import json import os def get_page(url): print('<程序%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def pasrse_page(res): print('<程序%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] p=Pool(3) res_l=[] for url in urls: res=p.apply_async(get_page,args=(url,),callback=pasrse_page) res_l.append(res) p.close() p.join() print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回撥函式處理了 ------------------------------------------------------ <程序3388> get https://www.baidu.com <程序3389> get https://www.python.org <程序3390> get https://www.openstack.org <程序3388> get https://help.github.com/ <程序3387> parse https://www.baidu.com <程序3389> get http://www.sina.com.cn/ <程序3387> parse https://www.python.org <程序3387> parse https://help.github.com/ <程序3387> parse http://www.sina.com.cn/ <程序3387> parse https://www.openstack.org [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() res=requests.get('http://maoyan.com/board/7') print(re.findall(pattern,res.text))爬蟲案例
無需回撥函式
ps:若主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待程序池中所有程序執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到所有結果 print(nums) #主程序拿到所有的處理結果,可以在主程序中進行統一進行處理