程序間通訊:佇列,管道,檔案,共享記憶體,訊號量,事件,互斥鎖,socket
程序間通訊(IPC,inter-process communication):生產程序生產食物,消費程序購買食物,消費程序一直監視生產狀況,只要一有食物就將其取出來,如果取到食物None,兩者關係結束,於是主程序也結束。
遠端過程呼叫協議(remote procedure call protocal),需要某些傳輸協議
一般情況下 父程序會等子程序結束再結束
=====================================================================
1.程序間通訊常用方式:
程序間傳遞資料
(1)佇列:
from multiprocessing import Queue(multiprocessing.Queue)
(前一章:from queue import PriorityQueue
A.from multiprocessing import Queue(multiprocessing.Queue):
在多執行緒中傳遞資料,交換資料,進行通訊;
它與一般的資料結構中的佇列的差別為更加符合多程序的特性:序列化和反序列化;
對資料做了一些通訊商的加工,保證多程序的安全性;
在這裡使用多程序的資料通訊,不需要考慮多程序可能帶來的安全隱患,直接使用;
multiprocessing.Queue在程序間通訊是python比價提倡的一種方式
B.from queue import PriorityQueue(queue.PriorityQueue):
儲存資料的一種資料結構
(2)管道:
管道中,子程序間只能單向通訊,一個子程序在定義管道後只能get或者put;
conn1只能接訊息,conn2只能傳送訊息,不用的一端close(原因在於單快取),如果要實現雙向通訊,就設定兩根管道
但是佇列中子程序既可以get也可以put;
匿名管道:只能在父子程序中使用
命名管道:沒有沁園關係的程序之間也可以通訊
(3)檔案:
open,read,write,seek,tell,close
mode:r,w,a,b
with open('test.txt') as f: #該操作方式可以自行關閉檔案 不必f.close()
早期時各種硬體裝置都是私有介面,unix系統將裝置檔案化,用r,w,a,b等api介面對裝置進行操作
(4)共享記憶體
-------------------------------
程序間傳遞訊號
(5)訊號量
(6)事件
(7)互斥量
from multiprocessing import Lock
lock.acquire()
money.value += 1 #注意鎖的粒度,儘快最小話,開銷最小;粒度最小原則
lock.release()
--------------------------------
(8)Socket
=====================================================================
作業:
(1)佇列實現生產者消費者模型;生產者生產[0,1,2,3,4,5,6,7,8,9,10]
消費者能夠在處理完list序列後退出
(2)使用互斥鎖去完成銀行存取款操作,存款10000,A取10次,每次取100;B存5次,每次存200
(3)面向物件方法實現第一個作業的第二題myTimer
(1)
from multiprocessing import Queue
from multiprocessing import Process
import time
def consumer(input_q):
while True:
item = input_q.get()
if item == None:
break
print(item)
def producer(sequence,output_p):
for i in sequence:
time.sleep(1)
output_p.put(i)
if __name__ == '__main__':
q= Queue()
con_p = Process(target=consumer,args=(q,))
con_p.start()
sequence =[1,2,3,4,5,6,7,8,9,10]
producer(sequence,q)
q.put(None)
(2)
from multiprocessing import Process
from multiprocessing import Value #在不同的程序間共享的變數
from multiprocessing import Lock
import time
def deposit(money,lock): #存錢
for i in range(5):
time.sleep(0.001)
lock.acquire() #上鎖的操作
money.value += 200
lock.release()
def withdraw(money,lock): #取錢
for i in range(10):
time.sleep(0.001)
lock.acquire()
money.value -= 100
lock.release()
if __name__ == '__main__':
money = Value('i',2000)
lock = Lock()
d = Process(target=deposit,args=(money,lock))
d.start()
w = Process(target=withdraw,args=(money,lock))
w.start()
d.join()
w.join()
print(money.value)
(3)
from multiprocessing import Process
import time
import os
class Proces:
def __init__(set,interval):
self.interval = interval
def new_start(self):
p = Process(target = self.run,args =())
p.start()
print('child process id is %d,name is %s' % (p.pid,p.name))
print(os.getpriority(os.PRIO_PROCESS,p.pid))
os.setpriority(os.PRIO_PROCESS,p.pid,1) #獲取程序改優先順序
print(os.getpriority(os.PRIO_PROCESS,p.pid))
p.join()
print('main process')
class Timer(Proces):
def __init__(self,interval):
self.interval = interval
def run(self): #列印時間的方法
for i in range(5):
time.sleep(self.interval)
print(time.ctime()+'have a rest')
if __name__ == '__main__':
t = Timer(2) #建立定時器
t.new_start() #繼承process do not call run 父類中函式會自動呼叫這個run方法
================================================================================================
相關程式碼:
設定優先順序1-20,做一個模擬os的排程程式:某一時刻,四個程序:
Watch TV: random優先順序 import rnadom
Listen to music: random優先順序
Print Doc: 4
Write Doc: 4
建立佇列,資訊放進佇列,模擬佇列的排程(觀察取數情況,驗證時間片)
我的解法:
from queue import PriorityQueue
# import random
# class Item(object):
# def __init__(self,name,level): #self 可以將屬性寫進去
# self.name = name
# self.level = level
# def __repr__(self):
# return (str(self.name) + ':' + str(self.level))
# def __lt__(self,other): #小於的比較 重寫
# return self.level > other.level
# if __name__ == '__main__':
# q = PriorityQueue()
# q.put(Item('watch tv',random.randint(1,20)))
# q.put(Item('music',random.randint(1,20)))
# q.put(Item('print',4))
# q.put(Item('write',4))
# while not q.empty():
# print(q.get())
#===============================================================================================
多程序定期器:每一個小時,你的定時器提示你:不要coding,休息一下吧!
提示的同時顯示當前程序的pid,name,os模組設定優先順序
我的解法:
# from multiprocessing import Process
# import time
# def getTime(interval):
# while True:
# time.sleep(interval)
# print(time.ctime())
# if __name__ == '__main__':
# p = Process(target=getTime,args=(1,))
# p.start()
# print(p.name,p.pid) #子程序 子程序的id
# p.join() #等待子程序結束 實際是阻塞
# print('ending') #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞
#===============================================================================================
#顯示程序id,優先順序
# from multiprocessing import Process
# import time
# import os
# def getTime(interval):
# while True:
# time.sleep(interval)
# print(time.ctime())
# if __name__ == '__main__':
# p = Process(target=getTime,args=(1,))
# p.start()
# print('子程序的名字和id:',p.name,p.pid) #子程序 子程序的id
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# os.setpriority(os.PRIO_PROCESS,p.pid,1) #獲取程序改優先順序
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# p.join() #等待子程序結束 實際是阻塞
# print('ending') #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞
#===============================================================================================
#將定時器改成面向物件的形式@@@@@@@@@@@@@已經完成,在20171106程式碼檔案中
# from multiprocessing import Process
# import time
# import os
# class Timer(Process):
# def __init__(self,interval):
# self.sleep = interval
# def run(self): #列印時間的方法
time.sleep(interval)
print(time.ctime()+'have a rest')
print(os.getpriority(os.PRIO_PROCESS,p.pid))
os.setpriority(os.PRIO_PROCESS,p.pid,1) #獲取程序改優先順序
print(os.getpriority(os.PRIO_PROCESS,p.pid))
# if __name__ == '__main__':
# t = Timer(3600) #建立定時器
# t.start() #繼承process do not call run 父類中函式會自動呼叫這個run方法
#===============================================================================================
#===============================================================================================
20171104作業 - 老師的解法
設定優先順序1-20,做一個模擬os的排程程式:某一時刻,四個程序:
Watch TV: random優先順序 import rnadom
Listen to music: random優先順序
Print Doc: 4
Write Doc: 4
建立佇列,資訊放進佇列,模擬佇列的排程(觀察取數情況,驗證時間片)
# from queue import PriorityQueue
# import random
# class Item(object):
# def __init__(self,name,level): #self 可以將屬性寫進去
# self.name = name
# self.level = level
# def __repr__(self):
# return (str(self.name) + ':' + str(self.level))
# def __lt__(self,other): #小於的比較 重寫
# return self.level > other.level
# if __name__ == '__main__':
# q = PriorityQueue()
# q.put(Item('watch tv',random.randint(1,20)))
# q.put(Item('music',random.randint(1,20)))
# q.put(Item('print',4))
# q.put(Item('write',4))
# while not q.empty():
# print(q.get())
# q.put(Item('print',4+q.qsize()))
# q.put(Item('write',4+q.qsize()))
#===============================================================================================
20171104作業- 老師的解法
多程序定期器:每一個小時,你的定時器提示你:不要coding,休息一下吧!
提示的同時顯示當前程序的pid,name,os模組設定優先順序
# from multiprocessing import Process
# import time
# def getTime(interval):
# while True:
# time.sleep(interval)
# print(time.ctime())
# if __name__ == '__main__':
# p = Process(target=getTime,args=(1,))
# p.start()
# print(p.name,p.pid) #子程序 子程序的id
# p.join() #等待子程序結束 實際是阻塞
# print('ending') #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞
#==================================
#顯示程序id,優先順序
# from multiprocessing import Process
# import time
# import os
# def getTime(interval):
# while True:
# time.sleep(interval)
# print(time.ctime())
# if __name__ == '__main__':
# p = Process(target=getTime,args=(1,))
# p.start()
# print('子程序的名字和id:',p.name,p.pid) #子程序 子程序的id
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# os.setpriority(os.PRIO_PROCESS,p.pid,1) #獲取程序改優先順序
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# p.join() #等待子程序結束 實際是阻塞
# print('ending') #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞
#=================================
#將定時器改成面向物件的形式
# from multiprocessing import Process
# import time
# import os
# class Timer(Process):
# def __init__(self,interval):
# self.sleep = interval
# def run(self): #列印時間的方法
# time.sleep(interval)
# print(time.ctime()+'have a rest')
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# os.setpriority(os.PRIO_PROCESS,p.pid,1) #獲取程序改優先順序
# print(os.getpriority(os.PRIO_PROCESS,p.pid))
# if __name__ == '__main__':
# t = Timer(3600) #建立定時器
# t.start() #繼承process do not call run 父類中函式會自動呼叫這個run方法
#===============================================================================================
#===============================================================================================
20171106課堂練習程式碼
#生產消費模型
# from multiprocessing import Queue
# from multiprocessing import Process
# import time
# #消費者模型
# def consumer(input_q):
# time.sleep(2)
# while not input_q.empty():
# print(input_q.get())
# #生產者邏輯
# def producer(sequence,output_p):
# for i in sequence:
# output_p.put(i)
# if __name__ == '__main__':
# q= Queue()
# con_p = Process(target=consumer,args=(q,))
# con_p.start()
# sequence =[1,2,3,4,5]
# producer(sequence,q)
#=================================
#生產消費模型
# from multiprocessing import Queue
# from multiprocessing import Process
# import time
# #消費者模型
# def consumer(input_q):
# time.sleep(2)
# while True:
# if not input_q.empty():
# print(input_q.get())
# else:
# break
# #生產者邏輯
# def producer(sequence,output_p):
# for i in sequence:
# output_p.put(i)
# if __name__ == '__main__':
# q= Queue()
# con_p = Process(target=consumer,args=(q,))
# con_p.start()
# sequence =[1,2,3,4,5]
# producer(sequence,q)
#=================================
# #生產消費模型
# from multiprocessing import Queue
# from multiprocessing import Process
# import time
# #消費者模型
# def consumer(input_q): #input_q 宣告引數 從生產者那裡拿走物品(資料,佇列資料)
# while True:
# item = input_q.get()
# if item == None: #用None作為標誌,判斷生產者是否已經結束了生產
# break
# print(item)
# #生產者邏輯
# def producer(sequence,output_p):
# for i in sequence:
# time.sleep(1)
# output_p.put(i)
# if __name__ == '__main__':
# q= Queue()
# con_p = Process(target=consumer,args=(q,)) #消費者程序 建立一個程序,程序的目標一定是個函式名,或者值類中的函式名即方法名,程序的引數
# con_p.start()
# sequence =[1,2,3,4,5]
# # pro_p = Process(target=producer,args=(sequence,q)) #消費者程序
# # pro_p.start()
# producer(sequence,q) # 生產者開始生產數字12345,放入佇列中
# q.put(None) # 向佇列中放入一個空值(或者說是生產一個空值),空值也是值,是一個特殊的值
#=================================
# #生產消費模型 佇列 演示一個子程序既可以put 也可以get
# from multiprocessing import Queue
# from multiprocessing import Process
# import time
# #消費者模型
# def consumer(input_q): #input_q 宣告引數 從生產者那裡拿走物品(資料,佇列資料)
# while True:
# item = input_q.get()
# if item == None: #用None作為標誌,判斷生產者是否已經結束了生產
# # input_q.put(6)
# break
# print(item)
# #生產者邏輯
# def producer(sequence,output_p):
# for i in sequence:
# time.sleep(1)
# output_p.put(i)
# if __name__ == '__main__':
# q= Queue()
# con_p = Process(target=consumer,args=(q,)) #消費者程序 建立一個程序,程序的目標一定是個函式名,或者值類中的函式名即方法名,程序的引數
# con_p.start()
# sequence =[1,2,3,4,5]
# # pro_p = Process(target=producer,args=(sequence,q)) #消費者程序
# # pro_p.start()
# producer(sequence,q) # 生產者開始生產數字12345,放入佇列中
# # con_p.join() #一定等子程序結束,保證consumer也能向佇列中新增數字6 表明對列中子程序的雙方既可以put,也可以get,而管道只能單向,一個子程序在定義管道後只能get或者put
# q.put(None) # 向佇列中放入一個空值(或者說是生產一個空值),空值也是值,是一個特殊的值
#===============================================================================================
# #管道操作-僅瞭解
# from multiprocessing import Pipe
# from multiprocessing import Process
# #消費者模型
# def consumer(pipe):
# (conn1,conn2) = pipe #兩個變數構成一個管道,管道是設定的引數pipe
# conn2.close() #關閉管道的傳送方!!!
# while True:
# try:
# item = conn1.recv()
# print(item)
# except EOFError: #檔案讀取錯誤
# break
# print('consumer ending done')
# #生產者邏輯
# def producer(sequence,sendPipe):
# for i in sequence:
# sendPipe.send(i)
# if __name__ == '__main__':
# (conn1,conn2) = Pipe() #建立管道,conn1是接收方 conn2是傳送方
# con_p = Process(target=consumer,args=((conn1,conn2),)) #建立消費者子程序,把管道(元組)作為引數傳入;生產者為主程序
# con_p.start()
# conn1.close() #關閉生產者的輸出管道
# sequence =[1,2,3,4,5] #這裡還要用到傳送方
# producer(sequence,conn2)
# conn2.close()
#=================================
# #管道操作- 僅瞭解
# from multiprocessing import Pipe
# from multiprocessing import Process
# #消費者模型
# def consumer(pipe):
# (conn1,conn2) = pipe
# conn2.close()
# while True:
# try:
# item = conn1.recv()
# except EOFError:
# conn1.close()
# break
# print(item)
# print('consumer ending done')
# #生產者邏輯
# def producer(sequence,sendPipe):
# for i in sequence:
# sendPipe.send(i)
# if __name__ == '__main__':
# (conn1,conn2) = Pipe() #建立管道,conn1是接收方 conn2是傳送方
# con_p = Process(target=consumer,args=((conn1,conn2),)) #建立消費者子程序,把管道(元組)作為引數傳入;生產者為主程序
# con_p.start()
# conn1.close() #關閉生產者的輸出管道
# sequence =[1,2,3,4,5] #這裡還要用到傳送方
# producer(sequence,conn2)
# conn2.close()
#===============================================================================================
# #銀行存取款demo1 正常
# from multiprocessing import Process
# from multiprocessing import Value #在不同的程序間共享的變數
# import time
# def deposit(money): #存錢
# for i in range(100):
# money.value += 1
# def withdraw(money): #取錢
# for i in range(100):
# money.value -= 1
# if __name__ == '__main__':
# money = Value('i',2000) #共享變數 int 數值2000
# d = Process(target=deposit,args=(money,))
# d.start()
# w = Process(target=withdraw,args=(money,))
# w.start()
# d.join()
# w.join()
# print(money.value) #三個程序一起跑 不一定會執行詞句 需要等待子程序結束
#=================================
# #銀行存取款demo1 - 存取款異常
前提知識1:多程序不能訪問同一個全域性變數,原因是不同的程序使用的虛擬記憶體空間不同,而全域性變數實際是用的同一個記憶體空間,
因此需要 import Value 生成一個能夠在不同的程序間共享和操作的變數
前提知識2:對於time.sleep ,在一個程式中,子程序之間會搶佔資源,比如搶佔 value ,看a在睡醒的時刻是否能夠搶佔到資源value
# from multiprocessing import Process
# from multiprocessing import Value #在不同的程序間共享的變數
# import time
# def deposit(money): #存錢
# for i in range(100):
# time.sleep(0.1)
# money.value += 1
# def withdraw(money): #取錢
# for i in range(100):
# time.sleep(0.1)
# money.value -= 1
# if __name__ == '__main__':
# money = Value('i',2000) #共享變數 int 數值2000
# d = Process(target=deposit,args=(money,))
# d.start()
# w = Process(target=withdraw,args=(money,))
# w.start()
# d.join()
# w.join()
# print(money.value) #三個程序一起跑 不一定會執行詞句 需要等待子程序結束
#=================================
#銀行存取款demo2 - 存取款異常-解決 —— 鎖1
from multiprocessing import Process
from multiprocessing import Value #在不同的程序間共享的變數
from multiprocessing import Lock
import time
def deposit(money,lock): #存錢
# lock.acquire() #上鎖的操作
for i in range(100):
time.sleep(10)
money.value += 1
# lock.release()
def withdraw(money,lock): #取錢
# lock.acquire() #上鎖的操作
for i in range(100):
time.sleep(0.0001)
money.value -= 1
# lock.release()
if __name__ == '__main__':
money = Value('i',2000) #共享變數 int 數值2000
lock = Lock() #銀行為了安全 加了一把互斥鎖
d = Process(target=deposit,args=(money,lock))
d.start()
w = Process(target=withdraw,args=(money,lock))
w.start()
d.join()
w.join()
print(money.value) #三個程序一起跑 不一定會執行詞句 需要等待子程序結束
#=================================
# #銀行存取款demo3 - 存取款異常-解決 —— 鎖2 -更優 因為鎖的力度更小,時間更短
# from multiprocessing import Process
# from multiprocessing import Value #在不同的程序間共享的變數
# from multiprocessing import Lock
# import time
# def deposit(money,lock): #存錢
# for i in range(100):
# time.sleep(0.1)
# lock.acquire() #上鎖的操作
# money.value += 1
# lock.release()
# def withdraw(money,lock): #取錢
# for i in range(100):
# time.sleep(0.1)
# lock.acquire() #上鎖的操作
# money.value -= 1
# lock.release()
# if __name__ == '__main__':
# money = Value('i',2000) #共享變數 int 數值2000
# lock = Lock() #銀行為了安全 加了一把互斥鎖
# d = Process(target=deposit,args=(money,lock))
# d.start()
# w = Process(target=withdraw,args=(money,lock))
# w.start()
# d.join()
# w.join()
# print(money.value) #三個程序一起跑 不一定會執行詞句 需要等待子程序結束,因此新增join方法