python queue
線程中的Queue
1 import time 2 import threading 3 import queue 4 import random 5 6 def putMessage(): 7 for i in "Hello World!!!": 8 q.put(i) 9 time.sleep(random.random()) 10 # print("size:%s"%q.qsize()) # 查看隊列長度 11 # 12 # print("full:%s"%q.full()) # 查看隊列是否為滿的狀態13 # 14 # print("empty:%s"%q.empty()) # 查看隊列是否為空的狀態 15 16 17 def getMessage(): 18 while True: 19 if not q.empty(): 20 print(q.get()) 21 else: 22 time.sleep(random.random()) 23 24 25 if __name__ == "__main__": 26 q = queue.Queue()27 28 t1 = threading.Thread(target=putMessage) 29 t1.setDaemon(True) 30 t1.start() 31 32 t2 = threading.Thread(target=getMessage) 33 t2.setDaemon(True) 34 t2.start() 35 36 time.sleep(10)
進程中的Queue
1 from multiprocessing import Queue 2 3 q = Queue(3) # 初始化一個Queue對象,最多可以put三條信息,如果不寫3,那麽久無限制4 5 q.put("Message01") # 添加信息的方法 6 q.put("Message02") 7 print(q.full()) # 查看 隊列 是否滿了的方法 8 9 q.put("Message03") 10 print(q.full()) 11 12 # 因為隊列已經滿了,所以下面的消息會出現異常,第一個 try 會等待2秒後再拋出異常, 13 # 第二個 try 會立刻拋出異常 14 try: 15 q.put("Message04", True, 2) 16 except: 17 print("消息隊列已滿,現有消息數量:%s"%q.qsize()) 18 19 try: 20 q.put_nowait("Message04") 21 except: 22 print("消息隊列已滿,現有消息數量:%s"%q.qsize()) 23 24 # 推薦使用的方式,先判斷隊列是否已滿,再寫入 25 if not q.full(): 26 q.put_nowait("Message04") 27 28 # 讀取消息的時候,先判斷消息隊列是否為空,再讀取 29 if not q.empty(): 30 for i in range(q.qsize()): 31 print(q.get_nowait())
隊列:
為什麽要用隊列?列表也很好用啊。:數據安全
創建方法:
模式1:FIFO -- queue.Queue()
模式2:FILO -- queue.LifoQueue()
模式3:priorty -- queue.PriorityQueue()
q.put([1, ‘hello‘])
q.put([2, ‘world‘])
級別 1 比 2 低, 1 先出來
方法的參數:
put()
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常
get()
調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。
其它方法:
q.empty() 如果隊列為空,返回True,反之False
q.full() 如果隊列滿了,返回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 獲取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號
q.join() 實際上意味著等到隊列為空,再執行別的操作
# join多少次,就需要用幾次 task_done
多進程優點:
1. 可以利用多核實現並行運算
缺點:
1. 開銷大
2. 通信困難
管道Pipe
multiprocessing.Pipe([duplex])
返回2個連接對象(conn1, conn2),代表管道的兩端,
默認是雙向通信.如果duplex=False,conn1只能用來接收消息,conn2只能用來發送消息.
主要用到的方法:
send() 發送數據
recv() 接收數據
1 import multiprocessing 2 3 4 from multiprocessing import Process, Pipe 5 6 def send(pipe): 7 pipe.send([‘spam‘] + [42, ‘egg‘]) 8 pipe.close() 9 10 def talk(pipe): 11 pipe.send(dict(name = ‘Bob‘, spam = 42)) 12 reply = pipe.recv() 13 print(‘talker got:‘, reply) 14 15 if __name__ == ‘__main__‘: 16 (con1, con2) = Pipe() 17 sender = Process(target = send, name = ‘send‘, args = (con1, )) 18 sender.start() 19 print("con2 got: %s" % con2.recv())#從send收到消息 20 con2.close() 21 22 (parentEnd, childEnd) = Pipe() 23 child = Process(target = talk, name = ‘talk‘, args = (childEnd,)) 24 child.start() 25 print(‘parent got:‘, parentEnd.recv()) 26 parentEnd.send({x * 2 for x in ‘spam‘}) 27 child.join() 28 print(‘parent exit‘)
進程間的信息共享Manage
Python中進程間共享數據,處理基本的queue,pipe外,還提供了更高層次的封裝。使用multiprocessing.Manager可以簡單地使用這些高級接口。
Manager支持的類型有list,dict,Namespace, Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
1 import multiprocessing 2 import time 3 4 5 def worker(d, key, value): 6 d[key] = value 7 8 if __name__ == ‘__main__‘: 9 mgr = multiprocessing.Manager() 10 11 d = mgr.dict() 12 jobs = [multiprocessing.Process(target=worker, args=(d, i, i*2)) 13 for i in range(10) 14 ] 15 for j in jobs: 16 j.start() 17 for j in jobs: 18 j.join() 19 print(‘Results:‘ ) 20 for key, value in enumerate(dict(d)): 21 print("%s=%s:%s" % (key, value, d[value])) 22 23 24 print("================================================================") 25 26 manager = multiprocessing.Manager() 27 Global = manager.Namespace() 28 Global.x = 10 29 Global.y = ‘hello‘ 30 print(Global) 31 32 print("==================================================================")
問題:
列表不可變
在學習python多進程管理manager時候,當不使用join對當前進程(主進程)進行阻塞時會報錯:
這樣進行一下總結:在使用manager管理/進行多進程及其數據交互時候,必須對每一個manager內的進程進行join-------待所有子進程完成後再回到主進程。
多進程之進程池
1 import time 2 from multiprocessing import Pool 3 4 def worker(): 5 for i in range(10): 6 print("From worker %s"%i) 7 time.sleep(0.5) 8 9 def foo(): 10 for i in range(10): 11 print("From foo %s"%i) 12 time.sleep(0.5) 13 14 def bar(): 15 for i in range(10): 16 print("From bar %s"%i) 17 time.sleep(0.5) 18 19 if __name__ == "__main__": 20 pool = Pool(4) # 創建Pool對象, 3 表示同時最多可以增加 3 條進程 21 pool.apply_async(worker) 22 pool.apply_async(worker) 23 pool.apply_async(worker) 24 pool.apply_async(foo) 25 pool.apply_async(foo) 26 pool.apply_async(foo) 27 pool.apply_async(bar) 28 pool.apply_async(bar) 29 pool.apply_async(bar) 30 31 pool.close() # 關閉進程池,禁止添加任務 32 pool.join() # 等待子進程結束後,主進程才往下走 33 print("Is done...")
並發之協程
1 import time 2 3 def consumer(): 4 r = ‘‘ 5 while True: 6 # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回; 7 # yield指令具有return關鍵字的作用。然後函數的堆棧會自動凍結(freeze)在這一行。 8 # 當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時, 9 # 就會從yield代碼的下一行開始,繼續執行,再返回下一次叠代結果。通過這種方式,叠代器可以實現無限序列和惰性求值。 10 n = yield r 11 if not n: 12 return 13 print(‘[CONSUMER] ←← Consuming %s...‘ % n) 14 time.sleep(1) 15 r = ‘200 OK‘ 16 def produce(c): 17 # 1、首先調用c.next()啟動生成器 18 next(c) 19 n = 0 20 while n < 5: 21 n = n + 1 22 print(‘[PRODUCER] →→ Producing %s...‘ % n) 23 # 2、然後,一旦生產了東西,通過c.send(n)切換到consumer執行; 24 cr = c.send(n) 25 # 4、produce拿到consumer處理的結果,繼續生產下一條消息; 26 print(‘[PRODUCER] Consumer return: %s‘ % cr) 27 # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。 28 c.close() 29 if __name__==‘__main__‘: 30 # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。 31 c = consumer() 32 produce(c)
協程封裝之greenlet
1 import greenlet 2 import time 3 import random 4 5 """ 6 創建方法:greenlet.greenlet(self, run=None, parent=None) 7 主要方法: 8 a.switch() 切換到 a 裏面執行 9 """ 10 def foo(): 11 for i in range(10): 12 print("foo:",i) 13 time.sleep(random.random()) 14 gb.switch() 15 16 17 def bar(): 18 for i in range(10): 19 print("bar:", i) 20 time.sleep(random.random()) 21 gf.switch() 22 23 if __name__ == "__main__": 24 gf = greenlet.greenlet(foo) 25 gb = greenlet.greenlet(bar) 26 27 gf.switch()
協程封裝之 gevent
1 import gevent 2 from gevent import monkey 3 import time 4 import random 5 6 monkey.patch_all() # 如果遇到 IO 阻塞,那麽就切換到下一個 協程的程序 7 8 def foo(): 9 for i in range(10): 10 print("foo:",i) 11 time.sleep(random.random()) 12 13 14 def bar(): 15 for i in range(10): 16 print("bar:", i) 17 time.sleep(random.random()) 18 19 20 if __name__ == "__main__": 21 gevent.joinall([gevent.spawn(foo), 22 gevent.spawn(bar)]) 23 24 25 # 固定用法,將裏面的函數放入到 協程的執行序列中
python queue