python之進程----Queue
阿新 • • 發佈:2017-08-25
random 其他屬性 put multi getpid 應用 int 初始化 aec 一、Queue是通過multiprocessing使用
from multiprocessing import Process,Queue import time import random import os def consumer(q): while True: res=q.get() if res is None: break time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃了 %s\033[0m‘ % (os.getpid(), res))生產者,消費者模型1def producer(q): for i in range(5): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) q.put(None) if __name__ == ‘__main__‘: q=Queue() #生產者們:廚師們 p1=Process(target=producer,args=(q,)) #消費者們:吃貨們 p2=Process(target=consumer,args=(q,)) p1.start() p2.start() p1.join() p2.join()print(‘主‘)
from multiprocessing import Process,Queue import time import random import os def consumer(q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃了 %s\033[0m‘ % (os.getpid(), res)) def product_baozi(q):生產者,消費者模型2for i in range(3): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) def product_jiaozi(q): for i in range(3): time.sleep(2) res=‘餃子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) def product_dabing(q): for i in range(3): time.sleep(2) res=‘大餅%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) if __name__ == ‘__main__‘: q=Queue() #生產者們:廚師們 p1=Process(target=product_baozi,args=(q,)) p2=Process(target=product_jiaozi,args=(q,)) p3=Process(target=product_dabing,args=(q,)) #消費者們:吃貨們 p4=Process(target=consumer,args=(q,)) p5=Process(target=consumer,args=(q,)) p_l=[p1,p2,p3,p4,p5] for p in p_l: p.start() # for p in p_l: # p.join() # p1.start() # p2.start() # p3.start() # p4.start() # p5.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) p4.join() p5.join() print(‘主‘)
q .put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),
並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。
如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。
如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
1:可以往隊列裏放任意類型的數據 2 隊列:先進先出
from multiprocessing import Process,Queue q=Queue(3) q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) # q.put(‘fourht‘) print(q.get()) print(q.get()) print(q.get()) # print(q.get())q.put和q.get
from multiprocessing import Process,Queue q=Queue(3) q.put(‘first‘,block=False) q.put(‘second‘,block=False) q.put(‘third‘,block=False) # q.put(‘fourth‘,block=False) q.put(‘fourth‘,block=True,timeout=3) q.get(block=False) q.get(block=True,timeout=3) q.get_nowait() #q.get(block=False)p.get的參數 二、JoinableQueue同樣通過multiprocessing使用。 JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數 量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
from multiprocessing import Process,JoinableQueue import time import random import os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃了 %s\033[0m‘ % (os.getpid(), res)) q.task_done() def product_baozi(q): for i in range(5): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) q.join() if __name__ == ‘__main__‘: q=JoinableQueue() #生產者們:廚師們 p1=Process(target=product_baozi,args=(q,)) #消費者們:吃貨們 p4=Process(target=consumer,args=(q,)) p4.daemon=True p1.start() p4.start() p1.join() print(‘主‘)生產者,消費者模型3
from multiprocessing import Process,JoinableQueue import time import random import os def product_baozi(q): for i in range(3): time.sleep(2) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) q.join() def product_jiaozi(q): for i in range(3): time.sleep(2) res=‘餃子%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) q.join() def product_dabing(q): for i in range(3): time.sleep(2) res=‘大餅%s‘ %i q.put(res) print(‘\033[44m%s 制造了 %s\033[0m‘ %(os.getpid(),res)) q.join() def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print(‘\033[45m%s 吃了 %s\033[0m‘ % (os.getpid(), res)) q.task_done() if __name__ == ‘__main__‘: q=JoinableQueue() #生產者們:廚師們 p1=Process(target=product_baozi,args=(q,)) p2=Process(target=product_jiaozi,args=(q,)) p3=Process(target=product_dabing,args=(q,)) #消費者們:吃貨們 p4=Process(target=consumer,args=(q,)) p5=Process(target=consumer,args=(q,)) p4.daemon=True p5.daemon=True p_l=[p1,p2,p3,p4,p5] for p in p_l: p.start() p1.join() p2.join() p3.join() print(‘主‘)生產者,消費者模型4
三,互斥鎖
互斥鎖應用:
from multiprocessing import Process,Lock import os import time def work(mutex): mutex.acquire() print(‘task[%s] 上廁所‘ %os.getpid()) time.sleep(3) print(‘task[%s] 上完廁所‘ %os.getpid()) mutex.release() if __name__ == ‘__main__‘: mutex=Lock() p1=Process(target=work,args=(mutex,)) p2=Process(target=work,args=(mutex,)) p3=Process(target=work,args=(mutex,)) p1.start() p2.start() p3.start() print(‘主‘)互斥鎖
四、模擬搶票:
from multiprocessing import Process,Lock import json import time import random import os def search(): dic=json.load(open(‘db.txt‘,)) print(‘剩余票數%s‘ %dic[‘count‘]) def get_ticket(): dic=json.load(open(‘db.txt‘,)) if dic[‘count‘] > 0: dic[‘count‘]-=1 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘%s 購票成功‘ %os.getpid()) def task(mutex): search() time.sleep(random.randint(1, 3)) #模擬購票一系列繁瑣的過程所花費的時間 mutex.acquire() get_ticket() mutex.release() if __name__ == ‘__main__‘: mutex=Lock() for i in range(50): p=Process(target=task,args=(mutex,)) p.start()模擬搶票
五、process對象的其他屬性補充
from multiprocessing import Process import os import time def work(): print(‘%s is working‘ %os.getpid()) time.sleep(3) if __name__ == ‘__main__‘: p1=Process(target=work) p2=Process(target=work) p3=Process(target=work) p1.daemon=True p2.daemon=True p3.daemon=True p1.start() #初始化1 p2.start() #初始化2 p3.start() #初始化3 p3.join() p1.join() p2.join() print(‘基於初始化的結果來繼續運行‘)process屬性補充1
from multiprocessing import Process import os import time def work(): print(‘%s is working‘ %os.getpid()) time.sleep(3) if __name__ == ‘__main__‘: p1=Process(target=work) # p2=Process(target=work) # p3=Process(target=work) p1.start() #初始化1 # p2.start() #初始化2 # p3.start() #初始化3 # p1.terminate() # time.sleep(3) # print(p1.is_alive()) print(p1.name) print(p1.pid) print(‘基於初始化的結果來繼續運行‘)process屬性補充2
python之進程----Queue