Python多執行緒與佇列
Python多執行緒與Queue佇列多執行緒在感官上類似於同時執行多個程式,雖然由於GIL的存在,在Python中無法實現執行緒的真正並行,但是對於某些場景,多執行緒仍不失為一個有效的處理方法:
1,不緊急的,無需阻塞主執行緒的任務,此時可以利用多執行緒在後臺慢慢處理;
2,IO密集型操作,比如檔案讀寫、使用者輸入和網路請求等,此時多執行緒可以近似達到甚至優於多程序的表現;
多執行緒的基本使用不再贅述,以下語法便可輕鬆實現:
1 def task(args1, args2): 2 pass 3 4 Thread( 5 target=task, 6 args=(args1, args2) 7 ).start()
這裡我們重點關注執行緒通訊。
假設有這麼一種場景:有一批源資料,指定一個操作係數N,需要分別對其進行與N的加減乘除操作,並將結果彙總。
當然這裡的加減乘除只是一種簡單處理,在實際的生產環境中,它其實代表了一步較為複雜的業務操作,幷包含了較多的IO處理。
自然我們想到可以開啟多執行緒處理,那麼緊接著的問題便是:如何劃分執行緒,是根據處理步驟劃分,還是根據源資料劃分?
對於前者,我們把涉及的業務操作單獨劃分位一個執行緒,即有4個執行緒分別進行加減乘除的操作,顯然上一個執行緒的結果是下一個執行緒的輸入,這類似於流水線操作;
而後者則是把源資料分為若干份,每份啟動一個執行緒進行處理,最終把結果彙總。
尤其是在一些複雜的場景下,會加大單個執行緒的出錯概率和測試難度。
那麼我們將開闢4個執行緒,分別執行加減乘除操作。最後一個除法執行緒結束則任務完成:
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 from Queue import Queue 5 from threading import Thread 6 7 8 class NumberHandler(object): 9 def __init__(self, n): 10 self.n = n 11 12 def add(self, num): 13 return num + self.n 14 15 def subtract(self, num): 16 return num - self.n 17 18 def multiply(self, num): 19 return num * self.n * self.n 20 21 def divide(self, num): 22 return num / self.n 23 24 25 class ClosableQueue(Queue): 26 SENTINEL = object() 27 28 def close(self): 29 self.put(self.SENTINEL) 30 31 def __iter__(self): 32 while True: 33 item = self.get() 34 try: 35 if item is self.SENTINEL: 36 return 37 yield item 38 finally: 39 self.task_done() 40 41 42 class StoppableWorker(Thread): 43 def __init__(self, func, in_queue, out_queue): 44 super(StoppableWorker, self).__init__() 45 self.in_queue = in_queue 46 self.out_queue = out_queue 47 self.func = func 48 49 def run(self): 50 for item in self.in_queue: 51 result = self.func(item) 52 self.out_queue.put(result) 53 print self.func 54 55 56 if __name__ == '__main__': 57 source_queue = ClosableQueue() 58 add_queue = ClosableQueue() 59 subtract_queue = ClosableQueue() 60 multiply_queue = ClosableQueue() 61 divide_queue = ClosableQueue() 62 result_queue = ClosableQueue() 63 64 number_handler = NumberHandler(5) 65 66 threads = [ 67 StoppableWorker(number_handler.add, add_queue, subtract_queue), 68 StoppableWorker(number_handler.subtract, subtract_queue, multiply_queue), 69 StoppableWorker(number_handler.multiply, multiply_queue, divide_queue), 70 StoppableWorker(number_handler.divide, divide_queue, result_queue), 71 ] 72 73 for _thread in threads: 74 _thread.start() 75 76 for i in range(10): 77 add_queue.put(i) 78 79 add_queue.close() 80 add_queue.join() 81 print 'add job done...' 82 subtract_queue.close() 83 subtract_queue.join() 84 print 'subtract job done...' 85 multiply_queue.close() 86 multiply_queue.join() 87 print 'multiply job done...' 88 divide_queue.close() 89 divide_queue.join() 90 print 'divide job done...' 91 result_queue.close() 92 93 print "%s items finished, result: %s" % (result_queue.qsize(), result_queue) 94 95 for i in result_queue: 96 print i
執行結果:
執行緒執行日誌:
總的結果:
可見執行緒交叉執行,但是任務卻是順序結束,這符合我們的預期。
值得注意的是,我們在ClosableQueue定義了一個close()方法,通過放入一個特殊的類變數SENTINEL告訴佇列應該關閉。此外,由於直接加減乘除結果不變,因此我特意乘了兩次來便於我們判斷結果。
總結:
1. Queue是一種高效的任務處理方式,它可以把任務處理流程劃分為若干階段,並使用多條python執行緒來同時執行這些子任務;
2. Queue類具備阻塞式的佇列操作、能夠指定緩衝區尺寸,而且還支 持join方法,這使得開發者可以構建出健壯的流水線。
&n