Python併發程式設計 —— 在Python程式中的程序操作
執行中的程式就是一個程序。所有的程序都是通過它的父程序來建立的。因此,執行起來的python程式也是一個程序,那麼我們也可以在程式中再建立程序。多個程序可以實現併發效果,也就是說,當我們的程式中存在多個程序的時候,在某些時候,就會讓程式的執行速度變快。我們可以藉助python中強大的模組。來實現建立程序這個功能。
1、multiprocessing模組
把所有和程序相關的機制都封裝在multiprocessing模組中了(內建模組)。
仔細說來,multiprocess不是一個模組而是python中一個操作、管理程序的包。 之所以叫multi是取自multiple的多功能的意思,在這個包中幾乎包含了和程序有關的所有子模組。大致可以分為四個部分:建立程序部分,程序同步部分,程序池部分,程序之間資料共享。
2、Process模組
(1)、Process模組介紹
process模組是一個建立程序的模組,藉助這個模組,就可以完成程序的建立。
引數介紹:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,表示一個子程序中的任務(尚未啟動) # 強調: 1. 需要使用關鍵字的方式來指定引數 2. args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號 引數介紹: 1.group 引數未使用,值始終為None 2.target 表示呼叫物件,即子程序要執行的任務 3.args 表示呼叫物件的位置引數元組,args=(1,2,'a',) 4.kwargs 表示呼叫物件的字典,kwargs={'name':'cai','age':20} 5.name 為子程序的名稱
方法介紹:
1、p.start():啟動程序,並呼叫該子程序中的p.run() 2、p.run(): 程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法 3、p.terminate(): 強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖 4、p.is_alive(): 如果p仍然執行,返回True 5、p.join([timeout]): 主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序
屬性介紹:
1、p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定
2、p.name: 程序的名稱
3、p.pid: 程序的pid
4、p.exitcode: 程序在執行時為None、如果為–N,表示被訊號N結束(瞭解即可)
5、p.authkey: 程序的身份驗證鍵,預設是由os.urandom()隨機生成的32字元的字串。這個鍵的用途是為涉及網路連線的底層程序間通訊提供安全性,這類連線只有在具有相同的身份驗證鍵時才能成功(瞭解即可)
在Windows中使用process模組的注意事項:
在Windows作業系統中由於沒有fork(Linux作業系統中建立程序的機制),在建立程序的時候會自動 import 啟動它的這個檔案,而在 import 的時候又執行了整個檔案,因此如果將process()直接寫在檔案中就會無限遞迴建立子程序報錯,所以必須把建立子程序的部分使用 if __name__ == '__main__'判斷保護起來,就不會遞迴運行了。
(2)、使用process模組建立程序
# 建立程序,並檢視主程序和子程序的程序號
import os
import time
from multiprocessing import Process
def func():
'''
在子程序中執行的func
:return:
'''
print('子程序 :',os.getpid(),os.getppid())
time.sleep(3)
if __name__ == '__main__':
p = Process(target=func)
p.start() # start相當於告訴作業系統要開啟一個子程序,而子程序的排程是由作業系統控制的
print('主程序 :',os.getpid())
# 執行結果:先列印主程序id 再列印子程序id (非同步)
# 主程序是在子程序執行完畢之後才結束的,主程序回收子程序的資源
(3)、join 方法: 阻塞,直到對應的子程序物件執行結束
import time
from multiprocessing import Process
def son_process():
time.sleep(4)
print('通知xxx使用者,機器掛掉了')
if __name__ == '__main__':
p = Process(target=son_process)
p.start()
p.join() # 阻塞,直到p對應的子程序物件執行結束
print('所有任務結束')
import time
from multiprocessing import Process
def son_process(n):
print('start', n)
time.sleep(2)
print('end',n)
if __name__ == '__main__':
p_l = []
for i in range(10):
p = Process(target=son_process,args=(i,))
p.start() # start相當於告訴作業系統要開啟一個子程序,而子程序的排程是由作業系統控制的
p_l.append(p)
for p in p_l:p.join() # join 如果執行這句話的時候子程序已經結束了,那麼join就不阻塞了
print('所有任務結束')
(4)、多個子程序
import os
import time
from multiprocessing import Process
def son_process():
print('strat son',os.getpid())
time.sleep(1)
print('end son')
if __name__ == '__main__':
print(os.getpid())
for i in range(5):
Process(target=son_process).start()
# 多個子程序同時執行時,子程序的執行順序不是根據啟動順序決定的
(5)、守護程序
守護程序會隨著父程序的程式碼結束而結束
主程序建立守護程序
一:守護程序會在主程序程式碼執行結束後就終止
二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止
import time
from multiprocessing import Process
def son():
while True:
time.sleep(1)
print('in son')
if __name__ == '__main__':
p = Process(target=son)
p.daemon = True # 將當前的子程序設定為守護程序
# 一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行
p.start()
time.sleep(5)
正常情況下,父程序永遠會等著子程序結束,子程序結束之後,父程序才會結束
如果設定了守護程序,父程序的程式碼結束之後,守護程序也跟著結束
# 注:
程式碼結束和程序結束是兩回事
# 沒設定守護程序時:
子程序的程式碼和主程序的程式碼自己執行自己的,相互之間沒關係
如果主程序的程式碼先結束,主程序不結束,等子程序程式碼結束,回收子程序的資源,主程序才結束
如果子程序的程式碼先結束,主程序邊回收子程序的資源邊執行自己的程式碼,當代碼和資源都回收結束,主程序才結束
# 設定了守護程序
子程序的程式碼和主程序的程式碼自己執行自己的,互相之間沒關係
一旦主程序的程式碼先結束,主程序會先結束掉子程序,然後回收資源,然後主程序才結束
守護程序不會守護除了主程序程式碼之外的其他子程序
import time
from multiprocessing import Process
def son():
while True:
time.sleep(1)
print('in son')
def son2():
print('start son2')
time.sleep(10)
print('end son2')
if __name__ == '__main__':
p = Process(target=son)
p.daemon = True
p.start()
Process(target=son2).start()
time.sleep(5)
如果偏要求守護程序在子程序結束之後才結束
import time
from multiprocessing import Process
def son():
while True:
time.sleep(1)
print('in son')
def son2():
print('start son2')
time.sleep(10)
print('end son2')
if __name__ == '__main__':
p = Process(target=son)
p.daemon = True
p.start()
p = Process(target=son2)
p.start()
time.sleep(5)
p.join() # 所有的子程序都執行完,才不會阻塞
(6)、Process物件中的其他方法
import time
from multiprocessing import Process
def son():
while True:
time.sleep(1)
print('in son')
if __name__ == '__main__':
p = Process(target=son)
p.start()
time.sleep(5)
print(p.is_alive())
p.terminate() # 非同步非阻塞操作 # 強制終止程序p
# 關閉程序,不會立即關閉,所以is_alive 立刻 檢視的結果可能還是存活
time.sleep(0.1)
print(p.is_alive()) # 判斷子程序是否存活
print('主程序的程式碼並不結束')
(7)、面向物件的方式開啟子程序:(繼承Process類的形式開啟程序的方式)
import os
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print(os.getpid())
if __name__ == '__main__':
print('主 :',os.getpid())
MyProcess().start() # start會自動呼叫run
(8)、傳引數
import os
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name): # 執行父類的init
super().__init__()
self.name=name
def run(self):
print(os.getpid())
print('%s 在工作' %self.name)
if __name__ == '__main__':
print('主 :',os.getpid())
MyProcess('yong').start()
MyProcess('liang').start()
(9)、判斷程序之間的資料是否隔離
from multiprocessing import Process
n = 0
def son():
global n
n += 1
if __name__ == '__main__':
p_l = []
for i in range(20):
p = Process(target=son)
p.start()
p_l.append(p)
for p in p_l:p.join()
print(n)
3、資料安全(鎖):用來保證資料安全
如果多個程序同時對一個檔案進行操作會出現什麼問題?
實現了程式的非同步,讓多個任務可以同時在幾個程序中併發處理,但它們之間的執行沒有順序,一旦開啟也不受我們控制。儘管併發程式設計讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題:當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題。
# 多程序搶佔輸出資源
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))
if __name__ == '__main__':
for i in range(3):
p=Process(target=work,args=(i,))
p.start()
# 使用鎖維護執行順序
# 由併發變成了序列,犧牲了執行效率,但避免了競爭
import os
import time
import random
from multiprocessing import Process, Lock
def work(lock,n):
lock.acquire() # 給這段程式碼上鎖
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
lock.release() # 給這段程式碼解鎖
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,i))
p.start()
# 上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程式又重新變成串行了,這樣確實會浪費了時間,卻保證了資料的安全。
# 在使用程序鎖時同時要注意死鎖問題(多次引用互斥鎖Lock),可以使用遞迴鎖(RLock)
from multiprocessing import Process,Lock
def change(lock):
print('一部分併發的程式碼,多個程序之間互相不干擾的執行著')
lock.acquire() # 給這段程式碼上鎖
with open('file','r') as f:
content = f.read()
num = int(content)
num += 1
for i in range(1000000):i+=1
with open('file','w') as f:
f.write(str(num))
lock.release() # 給這段程式碼解鎖
print('另一部分併發的程式碼,多個程序之間互相不干擾的執行著')
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=change,args=(lock,)).start()
當多個程序同時操作檔案 / 共享一些資料的時候就會出現資料不安全(讀資料: 可以同時讀, 寫資料: 不能同時寫)
當開啟多個程序,同時執行10000000行程式碼,其中有20行程式碼涉及到了操作同一個檔案,只給這20行程式碼枷鎖,來保證資料的安全。
# 加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
# 雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理
# 因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
佇列和管道都是將資料存放於記憶體中
佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
4、程序之間的資料共享(程序之間通訊)
(1)、程序之間通訊 —— IPC (Inter Process Communication)
內建的模組實現的機制: 佇列 \ 管道
第三方工具 : redis rabbitMQ memcache
(2)、佇列
建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
Queue([maxsize])
建立共享的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖定實現。
# 佇列Queue = 管道Pipe + 鎖
# Pipe 基於檔案實現的(socket + pickle) = 資料不安全
# Queue 基於檔案(socket + pickle) + 鎖(lock) 實現的 = 資料安全
# 在程序之間維護資料的安全 --- 程序安全
# 佇列是程序安全的(程序佇列保證了程序的資料安全)
# 佇列都是先進先出的
佇列中的方法介紹
Queue([maxsize])
建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。底層佇列使用管道和鎖定實現。另外,還需要執行支援執行緒以便佇列中的資料傳輸到底層管道中。
Queue的例項q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True. 如果設定為False,將引發Queue.Empty異常(定義在Queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,將引發Queue.Empty異常(定義在Queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。
q.full()
如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。。
q.close()
關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。
q.cancel_join_thread()
不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。
q.join_thread()
連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。
程式碼示例
'''
multiprocessing模組支援程序間通訊的兩種主要形式:管道和佇列,都是基於訊息傳遞實現的
'''
from multiprocessing import Queue
q=Queue(3)
# put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。
# 如果佇列中的資料一直不被取走,程式就會永遠停在這裡。
# put是一個同步阻塞方法,會阻塞直到佇列不滿
try:
q.put_nowait(3) # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。
# put_nowait 同步非阻塞方法
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。
print('佇列已經滿了')
# 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) # 滿了 # 檢視當前佇列是否已滿
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。
# get是一個同步阻塞方法,會阻塞直到資料來
try:
q.get_nowait(3) # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
print('佇列已經空了')
print(q.empty()) # 空了 # 檢視當前佇列是否已空
ret = q.qsize() # 檢視當前佇列有多少值
父程序傳送資料給子程序
from multiprocessing import Queue,Process
def son(q):
print(q.get()) # 呼叫主函式中p程序傳遞過來的程序引數
if __name__ == '__main__':
q = Queue() # 建立一個Queue物件
p = Process(target=son,args=(q,)) # 建立一個程序
p.start()
q.put(123) # put函式為向佇列中新增一條資料
# 上面是一個queue的簡單應用,使用佇列q物件呼叫get函式來取得佇列中的資料。
批量生產資料放入佇列再批量獲取結果
import os
import time
import multiprocessing
# 向queue中輸入資料的函式
def inputQ(queue):
info = str(os.getpid()) + '(put):' + str(time.asctime())
queue.put(info)
# 向queue中輸出資料的函式
def outputQ(queue):
info = queue.get()
print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
# Main
if __name__ == '__main__':
multiprocessing.freeze_support()
record1 = [] # store input processes
record2 = [] # store output processes
queue = multiprocessing.Queue(3)
# 輸入程序
for i in range(10):
process = multiprocessing.Process(target=inputQ,args=(queue,))
process.start()
record1.append(process)
# 輸出程序
for i in range(10):
process = multiprocessing.Process(target=outputQ,args=(queue,))
process.start()
record2.append(process)
for p in record1:
p.join()
for p in record2:
p.join()
5、程序池和 multiprocess.Pool 模組
(1)、程序池
# 為什麼要有程序池?
在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序嗎?首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。因此引入了程序池。
# 程序池的概念
定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。
(2)、multiprocess.Pool模組
Pool([numprocess [,initializer [, initargs]]]): 建立程序池
1、numprocess: 要建立的程序數,如果省略,將預設使用cpu_count()的值
2、initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
3、initargs:是要傳給initializer的引數組
方法介紹:
p.apply(func [, args [, kwargs]]): 在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()'''
p.apply_async(func [, args [, kwargs]]): 在一個池工作程序中執行func(*args,**kwargs),然後返回結果。
'''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。'''
p.close(): 關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成
P.jion(): 等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫
# 其他方法:方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發異常。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
obj.ready():如果呼叫完成,返回True
obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式
程式碼示例:
# 程序池的同步呼叫
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) # 程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步呼叫,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞
# 但不管該任務是否存在阻塞,同步呼叫都會在原地等著
print(res_l)
# 程序池的非同步呼叫
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2
if __name__ == '__main__':
p=Pool(3) # 程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) # 非同步執行,根據程序池中有的程序數,每次最多3個子程序在非同步執行
# 返回結果之後,將結果放入列表,歸還程序,之後再執行新的任務
# 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束
# 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。
res_l.append(res)
# 非同步apply_async用法:如果使用非同步提交的任務,主程序需要使用jion,等待程序池內任務都處理完,然後可以用get收集結果
# 否則,主程序結束,程序池可能還沒來得及執行,也就跟著一起結束了
p.close()
p.join()
for res in res_l:
print(res.get()) # 使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get
6、使用 ProcessPoolExecutor 實現程序池
import time
from concurrent.futures import ProcessPoolExecutor
def task(n1, n2):
time.sleep(2)
print('任務')
if __name__ == '__main__':
# 建立程序池
pool = ProcessPoolExecutor(10)
for i in range(20):
pool.submit(task, i, 1)
print('END')