就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制
阿新 • • 發佈:2020-08-24
守護程序、互斥鎖、程序間通訊(IPC機制)
1、windows:tasklist |findstr 程序id號
2、mac,Linux:ps aux | grep 程序id號
3、程序物件:t = Process(target = task,)或者在程序內部:current_process()
4、t.pid或者current_process().pid 獲取程序id號
5、os.getpid() 同上獲取程序ID號
6、os.getppid() 獲取父程序ID號,子程序中獲取父程序ID,等於父程序的ID號
7、t.is_alive()或者current_process().is_alive() 檢視程序是否存活
8、t.terminate()關閉程序,在主程序關閉
二、守護程序
主程序建立守護程序
其一:守護程序會在主程序程式碼執行結束後就終止
其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止
例:程式碼
from multiprocessing import Process,current_process
import time
import os
def task():
print(os.getpid())print('子程序')
time.sleep(200)
print('子程序結束')
if __name__ == '__main__':
t = Process(target=task, )
# 守護程序:主程序一旦結束,子程序也結束
# t.daemon=True # 一定要加在啟動之前
t.start()
time.sleep(1)
print('主程序結束')
三、互斥鎖
程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
例:多個遠端共享同一個列印終端
#併發執行,效率高,但競爭同一列印終端,帶來了列印錯亂
from multiprocessing import Process
import os,time
def work():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=work)
p.start()
#由併發變成了序列,犧牲了執行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire()
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
例:多個程序共享同一個檔案 檔案檔資料庫,模擬搶票
#檔案db的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open('db.txt'))
print('\033[43m剩餘票數%s\033[0m' %dic['count'])
def get():
dic=json.load(open('db.txt'))
time.sleep(0.1) #模擬讀資料的網路延遲
if dic['count'] >0:
dic['count']-=1
time.sleep(0.2) #模擬寫資料的網路延遲
json.dump(dic,open('db.txt','w'))
print('\033[43m購票成功\033[0m')
def task(lock):
search()
get()
if __name__ == '__main__':
lock=Lock()
for i in range(100): #模擬併發100個客戶端搶票
p=Process(target=task,args=(lock,))
p.start()
檔案db的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
dic=json.load(open('db.txt'))
print('\033[43m剩餘票數%s\033[0m' %dic['count'])
def get():
dic=json.load(open('db.txt'))
time.sleep(0.1) #模擬讀資料的網路延遲
if dic['count'] >0:
dic['count']-=1
time.sleep(0.2) #模擬寫資料的網路延遲
json.dump(dic,open('db.txt','w'))
print('\033[43m購票成功\033[0m')
def task(lock):
search()
lock.acquire()
get()
lock.release()
if __name__ == '__main__':
lock=Lock()
for i in range(100): #模擬併發100個客戶端搶票
p=Process(target=task,args=(lock,))
p.start()
#加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實現程序間通訊,但問題是:
1.效率低(共享資料基於檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理
#因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。
1 佇列和管道都是將資料存放於記憶體中
2 佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。
四、佇列
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的
例:程式碼
from multiprocessing import Queue
# 例項化得到要給物件
q=Queue(5) # 預設很大,可以放很多,寫了個5,只能放5個
# 往管道中放值
q.put(1)
q.put('lqz')
q.put(18)
q.put(19)
# q.put(20)
# q.put(21)
# q.put_nowait(100)
# 從管道中取值
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get(timeout=100)) # 等0.1s還沒有值,就結束
# print(q.get_nowait()) # 不等了,有就是有,沒有就沒有
print(q.empty()) # 看一下佇列是不是空的
print(q.full()) # 看一下佇列是不是滿的
# 總結:
'''
q=Queue(佇列大小)
# 放值
q.put(asdf)
q.put_nowait(asdf) # 佇列滿了,放不進去就不放了,報錯
# 取值
q.get() # 從佇列頭部取出一個值
q.get_nowait() # 從佇列頭部取值,沒有就拋錯
# 佇列是否為空,是否滿
print(q.empty()) # 看一下佇列是不是空的
print(q.full()) # 看一下佇列是不是滿的
'''
2、IPC機制(程序間通訊)
# Inter-Process Communication,程序間通訊
from multiprocessing import Process, current_process, Queue
import time
import os
def task1(q):
print('我是task1程序,我的id號是:%s'%os.getpid())
q.put('lqz is handsome')
def task2(q):
# res=q.get()
# print('我是task2程序,我的id號是:%s'%os.getpid(),res)
print('我是task2程序,我的id號是:%s'%os.getpid())
if __name__ == '__main__':
q = Queue(5)
t1 = Process(target=task1, args=(q,))
t1.start()
t2 = Process(target=task2, args=(q,))
t2.start()
print(q.get())