Process join方法 以及其他屬性
Process join方法 以及其他屬性
在主進程運行過程中如果想並發地執行其他的任務,我們可以開啟子進程,此時主進程的任務與子進程的任務分兩種情況
情況一:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢後,主進程還需要等待子進程執行完畢,然後統一回收資源。
情況二:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢後才能繼續執行,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才繼續執行,否則一直在原地阻塞,這就是join方法的作用
join的用法:
from multiprocessing import Process import time import random import os def task(): print('%s is piaoing' %os.getpid()) time.sleep(random.randrange(1,3)) print('%s is piao end' %os.getpid()) if __name__ == '__main__': p=Process(target=task) p.start() p.join() #等待p停止,才執行下一行代碼 print('主')
會有很多人問有了join是不是程序就會變成穿行了呢,這個你需要明確一點,join這是讓主進程等待,子進程是都在運行的,同時啟動4個進程,那麽四個進程都會同時執行!
from multiprocessing import Process import time import random def task(name): print('%s is piaoing' %name) time.sleep(random.randint(1,3)) print('%s is piao end' %name) if __name__ == '__main__': p1=Process(target=task,args=('egon',)) p2=Process(target=task,args=('alex',)) p3=Process(target=task,args=('yuanhao',)) p4=Process(target=task,args=('wupeiqi',)) p1.start() p2.start() p3.start() p4.start() 既然join是等待進程結束, 那麽我像下面這樣寫, 進程不就又變成串行的了嗎? # 當然不是了, 必須明確:p.join()是讓誰等? # 很明顯p.join()是讓主線程等待p的結束,卡住的是主進程而絕非子進程p, p1.join() p2.join() p3.join() p4.join() print('主')
上面的啟動進程可以簡化為
p_l=[p1,p2,p3,p4]
for p in p_l:
p.start()
for p in p_l:
p.join()
Process對象的其他屬性或方法
import multiprocessing import time # 開啟進程的 第一種方式 def hi(name, x): print("start time", time.ctime()) print("hi %s" % name) time.sleep(x) print(" %s end time " % name, time.ctime()) if __name__ == '__main__': p1 = multiprocessing.Process(target=hi, args=("alex", 3), name="alex Process") # 給進程起個名字 p2 = multiprocessing.Process(target=hi, args=("egon", 2)) p1.start() print(p1.is_alive()) # 查看進程是否存活 print(p1.name) # 查看進程的名字 生成進程的時候可以取名字 p1.terminate() # 告訴操作系統 幹死p1 p2.start() p1.join() p2.join() print("主進程") print(p1.is_alive())
守護進程
主進程創建子進程,然後將該進程設置成守護自己的進程,守護進程就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟著殉葬了。
關於守護進程需要強調兩點:
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
如果我們有兩個任務需要並發執行,那麽開一個主進程和一個子進程分別去執行就ok了,如果子進程的任務在主進程任務結束後就沒有存在的必要了,那麽該子進程應該在開啟前就被設置成守護進程。主進程代碼運行結束,守護進程隨即終止。
import multiprocessing
import time
# 開啟進程的 第一種方式
def hi(name, x):
print("start time", time.ctime())
print("hi %s" % name)
time.sleep(x)
print(" %s end time " % name, time.ctime())
if __name__ == '__main__':
p = multiprocessing.Process(target=hi, args=("alex", 2))
p1 = multiprocessing.Process(target=hi, args=("egon", 3))
p.daemon = True
p.start()
p1.start()
time.sleep(1)
print("zhu died !", ) # 進程不會等他開的子進程結束才結束
time.sleep(0.3)
print(p.is_alive())
互斥鎖
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂
==並發運行,效率高==,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=work)
p.start()
如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,如果把多個進程比喻為多個人,互斥鎖的工作原理就是多個人都要去爭搶同一個資源:衛生間,一個人搶到衛生間後上一把鎖,其他人都要等著,等到這個完成任務後釋放鎖,其他人才有可能有一個搶到......所以互斥鎖的原理,就是把並發改成穿行,降低了效率,但保證了數據安全不錯亂。
#由並發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire() #加鎖
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release() #釋放鎖
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
模擬搶票
from multiprocessing import Process, Lock ---> # 進程鎖
import time
import json
def search(name):
count = json.load(open(r"E:\第四模塊\並發編程\db.json", "r"))["count"]
print("<%s>查看了剩余票數為 %s" % (name, count))
def get(name):
time.sleep(1)
count_dict = json.load(open(r"E:\學\並發編程\db.json", "r"))
if count_dict["count"] > 0:
count_dict["count"] -= 1
time.sleep(3)
print("<%s> 購票成功" % name)
json.dump(count_dict, open(r"E:\並發編程\db.json", "w"))
else:
print("%s fail !" %name)
def task(name,meux):
search(name)
with meux: # 運用了上下文管理器的方法
get(name)
if __name__ == '__main__':
# f = open(r"E:\並發編程\db.json")
meux = Lock()
for i in range(10):
p = Process(target=task, args=("路人<%s>" % i, meux))
p.start()
進程隊列
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
但需要明確:
1、隊列內存放的是消息而非大數據
2、隊列占用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小
主要方法介紹:
q.put方法用以插入數據到隊列中。
q.get方法可以從隊列讀取並且刪除一個元素。
隊列的使用
from multiprocessing import Queue
q = Queue(maxsize=3)
q.put("hello word")
q.put(123)
q.put([1, 234])
print("判斷隊列是否滿了", q.full())
q.put() 如果已經滿了 再放阻塞住了
print(q.get())
print(q.get())
print(q.get()) # 如果沒有數據q.get會卡住
print("判斷隊裏是否空了", q.empty())
生產者消費者模型介紹
為什麽要使用生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務,在並發編程中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什麽是生產者和消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的。
生產者消費者模型實現
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(20):
time.sleep(0.6)
print("生產的第%s個包子" % (i + 1))
res = i + 1
q.put(res)
# q.put(None) # 不能這樣 因為每一個生產者的生產效率不知道,導致有個生產者生產完了放一個None
# 這樣 消費者就不吃了,但是其實另外一個生產者還有包子再生產,,隊列裏面有包子其實,但是消費者走了一個
def consumer(q):
while True: # while q.size()>0 不能這樣 因為剛開始就是0個這樣進不了循環導致消費者直接沒運行
res = q.get()
if res is None: # 判斷如果是none則表明生產者們都生產完了
break
time.sleep(1)
print("消費者吃的第%s包子" % res)
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,)) # 生產者們
p2 = Process(target=producer, args=(q,)) # 生產者們
p3 = Process(target=producer, args=(q,)) # 生產者們 3個
c1 = Process(target=consumer, args=(q,)) # 消費者們 2個
c2 = Process(target=consumer, args=(q,)) # 消費者們
p1.start() # 生產者們啟動
p2.start()
p3.start()
c1.start() # 消費者們啟動
c2.start()
p1.join() # 保證等待消費者們執行完畢主程序才往下走
p2.join()
p3.join()
q.put(None) # 生產完畢之後往隊列裏面放2 個None 因為 有兩個消費者 要發兩個終止信息
q.put(None)
print('主程序')
JoinableQueue([maxsize]) 的使用
這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
參數介紹
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹
JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
from multiprocessing import Process, JoinableQueue
import time
def producer(q):
for i in range(3):
time.sleep(0.6)
print("生產的第%s個包子" % (i + 1))
res = i + 1
q.put(res)
q.join() # 接收信號
def consumer(q):
while True: # while q.size()>0 不能這樣 因為剛開始就是0個這樣進不了循環導致消費者直接沒運行
res = q.get()
time.sleep(1)
print("消費者吃的第%s包子" % res)
q.task_done() # 這個信號就是說明q這個隊列已經全部取完了 結束掉q,
# 給q.join 發一個信號,如果不發送這個信號,因為生產者不是守護進程,所以主進程就會一直卡住,等著生產者結束
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q,)) # 生產者們
p2 = Process(target=producer, args=(q,)) # 生產者們
p3 = Process(target=producer, args=(q,)) # 生產者們 3個
c1 = Process(target=consumer, args=(q,)) # 消費者們 2個
c2 = Process(target=consumer, args=(q,)) # 消費者們
c1.daemon = True # 這樣設置的目的是主進程結束了 消費者沒必要存活了
c2.daemon = True
p1.start() # 生產者們啟動
p2.start()
p3.start()
c1.start() # 消費者們啟動
c2.start()
p1.join() # 這個是等待生產者都結束,這樣主進程才能結束,因為如果不等待生產者結束,
# 那麽主進程一結束,消費者就死了,沒有消費者了就不能取包子,於發信號了
p2.join()
p3.join()
print('主程序')
Process join方法 以及其他屬性