鐵樂學python_Day39_多進程和multiprocess模塊2
鐵樂學python_Day39_多進程和multiprocess模塊2
鎖 —— multiprocess.Lock (進程同步)
之前我們千方百計實現了程序的異步,讓多個任務可以同時在幾個進程中並發處理,
但是它們之間的運行沒有順序,一旦開啟也不受我們控制。
盡管並發編程能讓我們更加充分的利用IO資源,但是也會帶來新的問題。
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
遇到數據、安全比速度重要的場景,我們就需要將進程變回受同步控制。
例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import time
import random
from multiprocessing import Process
def work(person, n):
print(‘%s 開始闖黃道十二宮中的 [第%s宮] 了!‘ % (person, n))
time.sleep(random.random())
print(‘%s 闖過了 [第%s宮]!‘ % (person, n))
if __name__ == ‘__main__‘:
start = [‘星矢‘, ‘一輝‘, ‘冰河‘, ‘紫龍‘, ‘阿瞬‘]
for i in range(1, 13):
role = random.choice(start)
p = Process(target=work, args=(role, i))
p.start()
星矢 開始闖黃道十二宮中的 [第4宮] 了!
阿瞬 開始闖黃道十二宮中的 [第1宮] 了!
星矢 開始闖黃道十二宮中的 [第6宮] 了!
一輝 開始闖黃道十二宮中的 [第5宮] 了!
紫龍 開始闖黃道十二宮中的 [第2宮] 了!
紫龍 開始闖黃道十二宮中的 [第7宮] 了!
星矢 開始闖黃道十二宮中的 [第3宮] 了!
冰河 開始闖黃道十二宮中的 [第9宮] 了!
紫龍 開始闖黃道十二宮中的 [第8宮] 了!
一輝 開始闖黃道十二宮中的 [第11宮] 了!
阿瞬 開始闖黃道十二宮中的 [第12 宮] 了!
冰河 開始闖黃道十二宮中的 [第10宮] 了!
一輝 闖過了 [第5宮]!
紫龍 闖過了 [第7宮]!
紫龍 闖過了 [第8宮]!
星矢 闖過了 [第4宮]!
一輝 闖過了 [第11宮]!
阿瞬 闖過了 [第12宮]!
冰河 闖過了 [第9宮]!
冰河 闖過了 [第10宮]!
紫龍 闖過了 [第2宮]!
阿瞬 闖過了 [第1宮]!
星矢 闖過了 [第6宮]!
星矢 闖過了 [第3宮]!
例:加鎖後
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import time
import random
from multiprocessing import Process, Lock
def work(lock, person, n):
# 加鎖由並發變成了串行,犧牲了運行效率,但避免了對資源的競爭
lock.acquire()
print(‘%s 開始闖黃道十二宮中的 [第%s宮] 了!‘ % (person, n))
time.sleep(random.random())
print(‘%s 闖過了 [第%s宮]!‘ % (person, n))
# lock中的方法,acquire和release可說是一對組合,先加鎖再解鎖並將鎖重置
lock.release()
if __name__ == ‘__main__‘:
# Lock得先創造一個實例出來,才方便使用它當中的方法
lock = Lock()
start = [‘星矢‘, ‘一輝‘, ‘冰河‘, ‘紫龍‘, ‘阿瞬‘]
for i in range(1, 13):
role = random.choice(start)
p = Process(target=work, args=(lock, role, i))
p.start()
星矢 開始闖黃道十二宮中的 [第2宮] 了!
星矢 闖過了 [第2宮]!
一輝 開始闖黃道十二宮中的 [第4宮] 了!
一輝 闖過了 [第4宮]!
紫龍 開始闖黃道十二宮中的 [第1宮] 了!
紫龍 闖過了 [第1宮]!
冰河 開始闖黃道十二宮中的 [第5宮] 了!
冰河 闖過了 [第5宮]!
星矢 開始闖黃道十二宮中的 [第8宮] 了!
星矢 闖過了 [第8宮]!
星矢 開始闖黃道十二宮中的 [第3宮] 了!
星矢 闖過了 [第3宮]!
冰河 開始闖黃道十二宮中的 [第6宮] 了!
冰河 闖過了 [第6宮]!
冰河 開始闖黃道十二宮中的 [第9宮] 了!
冰河 闖過了 [第9宮]!
阿瞬 開始闖黃道十二宮中的 [第7宮] 了!
阿瞬 闖過了 [第7宮]!
冰河 開始闖黃道十二宮中的 [第12宮] 了!
冰河 闖過了 [第12宮]!
阿瞬 開始闖黃道十二宮中的 [第10宮] 了!
阿瞬 闖過了 [第10宮]!
星矢 開始闖黃道十二宮中的 [第11宮] 了!
星矢 闖過了 [第11宮]!
同步控制
只要用到了鎖,鎖之內的代碼就變成同步的了。
鎖 :控制一段代碼,同一時間,只能被一個進程執行。
上面這種情況雖然使用加鎖的形式實現了順序的執行,
但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性。
例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
‘‘‘
文件db的內容為:{"ticket":2}
註意一定要用雙引號,不然json無法識別。
模擬並發運行,效率高, 但競爭對同一個文件寫入,數據寫入混亂。
不安全,保障不了數據的準確。
‘‘‘
import time
import json
import random
from multiprocessing import Process
def search():
dic = json.load(open(‘db‘))
print(‘\033[32m剩余票數%s\033[0m‘ % dic[‘ticket‘])
def get():
dic = json.load(open(‘db‘))
# 模擬讀數據的網絡延遲
time.sleep(random.random())
if dic[‘ticket‘] > 0:
dic[‘ticket‘] -= 1
# 模擬寫數據的網絡延遲
time.sleep(random.random())
json.dump(dic, open(‘db‘, ‘w‘))
print(‘\033[31m購票成功\033[0m‘)
def task():
search()
get()
if __name__ == ‘__main__‘:
# 模擬並發10個客戶端搶票
for i in range(10):
p = Process(target=task)
p.start()
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
購票成功
購票成功
購票成功
購票成功
購票成功
購票成功
購票成功
購票成功
購票成功
購票成功
可見數據並不準確。需要加鎖來保證數據安全和準確性 。
例:加鎖後
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
‘‘‘
文件db的內容為:{"ticket":2}
註意一定要用雙引號,不然json無法識別。
模擬對購票行為進行加鎖安全,保障了數據的準確和安全。
‘‘‘
import time
import json
import random
from multiprocessing import Process,Lock
def search():
dic = json.load(open(‘db‘))
print(‘\033[32m剩余票數%s\033[0m‘ % dic[‘ticket‘])
def get():
dic = json.load(open(‘db‘))
# 模擬讀數據的網絡延遲
time.sleep(random.random())
if dic[‘ticket‘] > 0:
dic[‘ticket‘] -= 1
# 模擬寫數據的網絡延遲
time.sleep(random.random())
json.dump(dic, open(‘db‘, ‘w‘))
print(‘\033[31m購票成功\033[0m‘)
def task(lock):
# 註意,別忘了傳Lock的實例化對象進來
search()
# 搜索不涉及寫入操作,且搜索需要速度快,可以不加鎖
# 到搶票和寫入購票成功,余票減1,就得對操作數據的代碼加鎖了
lock.acquire()
get()
lock.release()
if __name__ == ‘__main__‘:
lock = Lock()
# 模擬並發10個客戶端搶票
for i in range(10):
p = Process(target=task, args=(lock,))
p.start()
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
剩余票數2
購票成功
購票成功
例:較為完善提示的模擬搶票(加鎖)
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
‘‘‘
文件db的內容為:{"ticket":2}
註意一定要用雙引號,不然json無法識別。
模擬對購票行為進行加鎖安全,保障了數據的準確和安全。
‘‘‘
import json
import time
import random
from multiprocessing import Process, Lock
def check_ticket(i):
with open(‘db‘) as f:
ticket_count = json.load(f)
print(‘person%s查詢當前余票 :‘ % i, ticket_count[‘ticket‘])
def buy_ticket(i, lock):
check_ticket(i)
lock.acquire()
with open(‘db‘) as f:
ticket_count = json.load(f)
time.sleep(random.random())
if ticket_count[‘ticket‘] > 0:
print(‘person%s購票成功‘ % i)
ticket_count[‘ticket‘] -= 1
else:
print(‘余票不足,person%s購票失敗‘ % i)
time.sleep(random.random())
with open(‘db‘, ‘w‘) as f:
json.dump(ticket_count, f)
lock.release()
if __name__ == ‘__main__‘:
lock = Lock()
for i in range(5):
Process(target=buy_ticket, args=[i, lock]).start()
person2查詢當前余票 : 2
person3查詢當前余票 : 2
person1查詢當前余票 : 2
person0查詢當前余票 : 2
person4查詢當前余票 : 2
person2購票成功
person3購票成功
余票不足,person1購票失敗
余票不足,person0購票失敗
余票不足,person4購票失敗
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個進程共享一塊內存的數據)
2、幫我們處理好鎖問題。
這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,
避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
信號量 —— multiprocess.Semaphore
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
例如商場裏有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;
每調用一次release(),計數器加1.當計數器為0時,acquire()調用被阻塞。
這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。
信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念。
例:KTV迷你唱吧
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# KTV 4個房子
import time
import random
from multiprocessing import Process, Semaphore
def ktv(i, sem):
sem.acquire()
print(‘person %s 進來唱歌了‘ % i)
time.sleep(random.randint(1, 5))
print(‘person %s 從ktv出去了‘ % i)
sem.release()
if __name__ == ‘__main__‘:
sem = Semaphore(4)
for i in range(10):
Process(target=ktv, args=(i, sem)).start()
# 鎖+計數器
# .acquire() 計數器-1
# 計數器減為0 = 阻塞
# .release() 計數器+1
person 0 進來唱歌了
person 3 進來唱歌了
person 1 進來唱歌了
person 2 進來唱歌了
person 2 從ktv出去了
person 4 進來唱歌了
person 0 從ktv出去了
person 6 進來唱歌了
person 3 從ktv出去了
person 7 進來唱歌了
person 7 從ktv出去了
person 5 進來唱歌了
person 1 從ktv出去了
person 8 進來唱歌了
person 8 從ktv出去了
person 9 進來唱歌了
person 4 從ktv出去了
person 6 從ktv出去了
person 5 從ktv出去了
person 9 從ktv出去了
事件 —— multiprocess.Event
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:
全局定義了一個“Flag”,如果“Flag”值為 False,
那麽當程序執行 event.wait 方法時就會阻塞,
如果“Flag”值為True,那麽event.wait 方法時便不再阻塞。
clear:將“Flag”設置為False
set:將“Flag”設置為True
例:紅綠燈
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import time
import random
from multiprocessing import Process, Event
def car(i,e): # 感知狀態的變化
if not e.is_set(): # 當前這個事件的狀態如果是False
print(‘car%s正在等待‘%i) # 這輛車正在等待通過路口
e.wait() # 阻塞 直到有一個e.set行為 # 等紅燈
print(‘car%s通過路口‘%i)
def traffic_light(e): # 修改事件的狀態
print(‘\033[1;31m紅燈亮\033[0m‘)
# 事件在創立之初的狀態是False,相當於程序中的紅燈
time.sleep(2) # 紅燈亮2s
while True:
if not e.is_set(): # False
print(‘\033[1;32m綠燈亮\033[0m‘)
e.set()
elif e.is_set():
print(‘\033[1;31m紅燈亮\033[0m‘)
e.clear()
time.sleep(2)
if __name__ == ‘__main__‘:
e = Event()
Process(target=traffic_light, args=[e, ]).start()
for i in range(10):
time.sleep(random.randrange(0, 5, 2))
Process(target=car, args=[i, e]).start()
紅燈亮
綠燈亮
紅燈亮
car1正在等待
car2正在等待
綠燈亮
car2通過路口
car1通過路口
紅燈亮
car3正在等待
car4正在等待
綠燈亮
car4通過路口
car3通過路口
紅燈亮
car5正在等待
綠燈亮
car5通過路口
car6通過路口
car8通過路口
car7通過路口
紅燈亮
car9正在等待
car10正在等待
綠燈亮
car10通過路口
car9通過路口
car11通過路口
紅燈亮
綠燈亮
car12通過路口
car13通過路口
紅燈亮
綠燈亮
car14通過路口
紅燈亮
綠燈亮
car15通過路口
紅燈亮
car16正在等待
car17正在等待
綠燈亮
car17通過路口
car16通過路口
car18通過路口
car19通過路口
car20通過路口
紅燈亮
綠燈亮
進程間通信——隊列和管道 Queue和Pipe
IPC(Inter-Process Communication)
隊列
概念介紹
創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
Queue([maxsize])
創建共享的進程隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
另外,還需要運行支持線程以便隊列中的數據傳輸到底層管道中。
Queue的實例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。
block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。
timeout是可選超時時間,用在阻塞模式中。
如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。
block控制阻塞行為,默認為True。
如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。
timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。
q.qsize()
返回隊列中目前項目的正確數量。
此函數的結果並不可靠,因為在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。
在某些系統上,此方法可能引發NotImplementedError異常。
q.empty()
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。
也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
q.close()
關閉隊列,防止隊列中加入更多數據。
調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。
如果q被垃圾收集,將自動調用此方法。
關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。
例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.cancel_join_thread()
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。
q.join_thread()
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。
默認情況下,此方法由不是q的原始創建者的所有進程調用。
調用q.cancel_join_thread()方法可以禁止這種行為。
生產者消費者模型
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。
該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什麽要使用生產者和消費者模式?
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。
在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。
同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。
為了解決這個問題於是引入了生產者和消費者模式。
什麽是生產者消費者模式?
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,
所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,
消費者不找生產者要數據,而是直接從阻塞隊列裏取,
阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
import time
import random
from multiprocessing import Process, Queue
# 消費者
def consumer(q, name):
# 吃貨,一直吃
while True:
food = q.get()
# 當food取值為done時,消費者跳出循環停止消費
if food == ‘done‘:break
# 模擬消費的延遲
time.sleep(random.random())
print(‘%s 吃了 %s‘ % (name, food))
def producer(q, name, food):
# 與消費者一直吃不同,生產者一開始就是在有限範圍內循環,所以用for
for i in range(1, 6):
time.sleep(random.random())
print(‘%s 生產出%s%s‘ % (name, food, i))
q.put(‘%s%s‘ % (food, i))
if __name__ == ‘__main__‘:
# 創建一個隊列
q = Queue()
p1 = Process(target=producer, args=[q, ‘蜜蜂‘, ‘蜂蜜‘])
p2 = Process(target=producer, args=[q, ‘蘋果樹‘, ‘蘋果‘])
p1.start()
p2.start()
Process(target=consumer, args=[q, ‘鐵樂‘]).start()
Process(target=consumer, args=[q, ‘貓‘]).start()
p1.join()
p2.join()
# 有兩個消費者,就要在隊列加入兩個標誌結束的done
q.put(‘done‘)
q.put(‘done‘)
蜜蜂 生產出蜂蜜1
蜜蜂 生產出蜂蜜2
貓 吃了 蜂蜜1
蘋果樹 生產出蘋果1
貓 吃了 蘋果1
鐵樂 吃了 蜂蜜2
蜜蜂 生產出蜂蜜3
蘋果樹 生產出蘋果2
鐵樂 吃了 蘋果2
貓 吃了 蜂蜜3
蜜蜂 生產出蜂蜜4
蘋果樹 生產出蘋果3
貓 吃了 蘋果3
鐵樂 吃了 蜂蜜4
蜜蜂 生產出蜂蜜5
蘋果樹 生產出蘋果4
蘋果樹 生產出蘋果5
貓 吃了 蜂蜜5
鐵樂 吃了 蘋果4
貓 吃了 蘋果5
JoinableQueue([maxsize])
創建可連接的共享進程隊列。
這就像是一個Queue對象,但隊列允許項目的使用者通知生產者項目已經被成功處理。
通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue的實例p除了與Queue對象相同的方法之外,還具有以下方法:
q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。
如果調用此方法的次數大於從隊列中刪除的項目數量,將引發ValueError異常。
q.join()
生產者將使用此方法進行阻塞,直到隊列中所有項目均被處理。
阻塞將持續到為隊列中的每個項目均調用q.task_done()方法為止。
下面的例子說明如何建立永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
from multiprocessing import Process, JoinableQueue
import time, random, os
def consumer(q):
while True:
food = q.get()
time.sleep(random.randint(1, 3))
print(‘\033[32m%s 吃 %s\033[0m‘ % (os.getpid(), food))
q.task_done()
# 向q.join()發送一次信號,證明一個數據已經被取走了,
# 和普通隊列不同的是用不到發送標誌結束的信號來break終止循環
def producer(name, q):
for i in range(1, 6):
time.sleep(random.randint(1, 3))
res = ‘%s%s‘ % (name, i)
q.put(res)
print(‘\033[34m%s 生產了 %s\033[0m‘ % (os.getpid(), res))
q.join() # 生產完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。
if __name__ == ‘__main__‘:
# 創建JoinableQueue類型隊列
q = JoinableQueue()
# 生產者們:即廚師們
p1 = Process(target=producer, args=(‘蒸水蛋‘, q))
p2 = Process(target=producer, args=(‘清蒸魚‘, q))
p3 = Process(target=producer, args=(‘醬油雞‘, q))
# 消費者們:即吃貨們
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
# 設為守護進程
c1.daemon = True
c2.daemon = True
# 開始
p_l = [p1, p2, p3, c1, c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print(‘主進程結束‘)
# 主進程等--->p1,p2,p3等---->c1,c2
# p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據
# 因而c1,c2也沒有存在的價值了,不需要繼續阻塞在進程中影響主進程了。應該隨著主進程的結束而結束,所以設置成守護進程就可以了。
6448 生產了 蒸水蛋1
6416 吃 蒸水蛋1
6448 生產了 蒸水蛋2
6796 生產了 清蒸魚1
6972 生產了 醬油雞1
6972 生產了 醬油雞2
6448 生產了 蒸水蛋3
6680 吃 蒸水蛋2
6416 吃 清蒸魚1
6972 生產了 醬油雞3
6448 生產了 蒸水蛋4
6796 生產了 清蒸魚2
6972 生產了 醬油雞4
6680 吃 醬油雞1
6416 吃 醬油雞2
6448 生產了 蒸水蛋5
6796 生產了 清蒸魚3
6972 生產了 醬油雞5
6680 吃 蒸水蛋3
6416 吃 醬油雞3
6796 生產了 清蒸魚4
6796 生產了 清蒸魚5
6680 吃 蒸水蛋4
6416 吃 清蒸魚2
6416 吃 蒸水蛋5
6680 吃 醬油雞4
6416 吃 清蒸魚3
6680 吃 醬油雞5
6416 吃 清蒸魚4
6680 吃 清蒸魚5
主進程結束
管道 PIPE
創建管道的類:
Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),
其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道。
參數介紹:
dumplex:默認管道是全雙工的,如果將duplex設成False,conn1只能用於接收,conn2只能用於發送。
主要方法:
conn1.recv():接收conn2.send(obj)發送的對象。
如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麽recv方法會拋出EOFError。
conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象。
其他方法:
conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法。
conn1.fileno():返回連接使用的整數文件描述符。
conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。
如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。
maxlength指定要接收的最大字節數。
如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。
如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):
通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,
而size是要發送字節數。
結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收。
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,
該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。
offset指定緩沖區中放置消息處的字節位移。
返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
例:
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
from multiprocessing import Process, Pipe
def f(conn):
# 管道右端發送消息
conn.send("Hello The_Third_Wave")
conn.close()
if __name__ == ‘__main__‘:
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
# 管道另一端接收消息
print(parent_conn.recv())
p.join()
註意管道端點的正確管理問題。
如果是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。
這也說明了為何在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。
如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。
管道是由操作系統進行引用計數的,必須在所有進程中關閉管道後才能生成EOFError異常。
因此,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
例:
from multiprocessing import Process, Pipe
def f(parent_conn,child_conn):
parent_conn.close() #不寫close將不會引發EOFError,而將此句註釋則能正常運行
while True:
try:
print(child_conn.recv())
except EOFError:
child_conn.close()
if __name__ == ‘__main__‘:
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(parent_conn, child_conn,))
p.start()
child_conn.close()
parent_conn.send(‘hello‘)
parent_conn.close()
p.join()
OSError: handle is closed
另外,多個進程之間使用管道競爭之間會引起數據不安全,而為此,管道+鎖就會變成隊列的實現。
end
鐵樂學python_Day39_多進程和multiprocess模塊2