理解Python併發程式設計
前言
對我來說,程式設計的樂趣之一是想辦法讓程式執行的越來越快,程式碼越寫越優雅。在剛開始學習併發程式設計時,相信你它會有一些困惑,本文將解釋多個併發開發的問題並幫助你快速瞭解併發程式設計的不同場景和應該使用的解決方案。
GIL
Python(特指CPython)的多執行緒的程式碼並不能利用多核的優勢,而是通過著名的全域性解釋鎖(GIL)來進行處理的。如果是一個計算型的任務,使用多執行緒GIL就會讓多執行緒變慢。我們舉個計算斐波那契數列的例子:
12345678910111213141516171819202122232425262728293031323334353637383940 |
# coding=utf-8 |
執行的結果你猜猜會怎麼樣:
123 | ❯ python profile_thread.pyCOST: 5.05716490746COST: 6.75599503517 |
這種情況還不如不用多執行緒!
GIL是必須的,這是Python設計的問題:Python直譯器是非執行緒安全的。這意味著當從執行緒內嘗試安全的訪問Python物件的時候將有一個全域性的強制鎖。 在任何時候,僅僅一個單一的執行緒能夠獲取Python物件或者C API。每100個位元組的Python指令直譯器將重新獲取鎖,這(潛在的)阻塞了I/O操作。因為鎖,CPU密集型的程式碼使用執行緒庫時,不會獲得性能的提高(但是當它使用之後介紹的多程序庫時,效能可以獲得提高)。
那是不是由於GIL的存在,多執行緒庫就是個「雞肋」呢?當然不是。事實上我們平時會接觸非常多的和網路通訊或者資料輸入/輸出相關的程式,比如網路爬蟲、文字處理等等。這時候由於網路情況和I/O的效能的限制,Python直譯器會等待讀寫資料的函式呼叫返回,這個時候就可以利用多執行緒庫提高併發效率了。
同步機制
Python執行緒包含多種同步機制:
1. Semaphore(訊號量)
在多執行緒程式設計中,為了防止不同的執行緒同時對一個公用的資源(比如全部變數)進行修改,需要進行同時訪問的數量(通常是1)。訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。
123456789101112131415161718192021222324 | import timefrom random import randomfrom threading import Thread, Semaphoresema = Semaphore(3)def foo(tid): with sema: print '{} acquire sema'.format(tid) wt = random() * 2 time.sleep(wt) print '{} release sema'.format(tid)threads = []for i in range(5): t = Thread(target=foo, args=(i,)) threads.append(t) t.start()for t in threads: t.join() |
這個例子中,我們限制了同時能訪問資源的數量為3。看一下執行的效果:
1234567891011 | ❯ python semaphore.py0 acquire sema1 acquire sema 2 acquire sema2 release sema 3 acquire sema1 release sema 4 acquire sema0 release sema3 release sema4 release sema |
2. Lock(鎖)
Lock也可以叫做互斥鎖,其實相當於訊號量為1。我們先看一個不加鎖的例子:
123456789101112131415161718192021222324 | import timefrom threading import Threadvalue = 0def getlock(): global value new = value + 1 time.sleep(0.001) # 使用sleep讓執行緒有機會切換 value = newthreads = []for i in range(100): t = Thread(target=getlock) t.start() threads.append(t)for t in threads: t.join()print value |
執行一下:
12 | ❯ python nolock.py16 |
大寫的黑人問號。不加鎖的情況下,結果會遠遠的小於100。那我們加上互斥鎖看看:
12345678910111213141516171819202122232425 | import timefrom threading import Thread, Lockvalue = 0lock = Lock()def getlock(): global value with lock: new = value + 1 time.sleep(0.001) value = newthreads = []for i in range(100): t = Thread(target=getlock) t.start() threads.append(t)for t in threads: t.join()print value |
我們對value的自增加了鎖,就可以保證了結果了:
12 | ❯ python lock.py100 |
3. RLock(可重入鎖)
acquire() 能夠不被阻塞的被同一個執行緒呼叫多次。但是要注意的是release()需要呼叫與acquire()相同的次數才能釋放鎖。
4. Condition(條件)
一個執行緒等待特定條件,而另一個執行緒發出特定條件滿足的訊號。最好說明的例子就是「生產者/消費者」模型:
12345678910111213141516171819202122232425262728 | import timeimport threadingdef consumer(cond): t = threading.currentThread() with cond: cond.wait() # wait()方法建立了一個名為waiter的鎖,並且設定鎖的狀態為locked。這個waiter鎖用於執行緒間的通訊 print '{}: Resource is available to consumer'.format(t.name)def producer(cond): t = threading.currentThread() with cond: print '{}: Making resource available'.format(t.name) cond.notifyAll() # 釋放waiter鎖,喚醒消費者condition = threading.Condition()c1 = threading.Thread(name='c1', target=consumer, args=(condition,))c2 = threading.Thread(name='c2', target=consumer, args=(condition,))p = threading.Thread(name='p', target=producer, args=(condition,))c1.start()time.sleep(1)c2.start()time.sleep(1)p.start() |
執行一下:
1234 | ❯ python condition.pyp: Making resource availablec2: Resource is available to consumerc1: Resource is available to consumer |
可以看到生產者傳送通知之後,消費者都收到了。
5. Event
一個執行緒傳送/傳遞事件,另外的執行緒等待事件的觸發。我們同樣的用「生產者/消費者」模型的例子:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 | # coding=utf-8import timeimport threadingfrom random import randintTIMEOUT = 2def consumer(event, l): t = threading.currentThread() while 1: event_is_set = event.wait(TIMEOUT) if event_is_set: try: integer = l.pop() print '{} popped from list by {}'.format(integer, t.name) event.clear() # 重置事件狀態 except IndexError: # 為了讓剛啟動時容錯 passdef producer(event, l): t = threading.currentThread() while 1: integer = randint(10, 100) l.append(integer) print '{} appended to list by {}'.format(integer, t.name) event.set() # 設定事件 time.sleep(1)event = threading.Event()l = []threads = []for name in ('consumer1', 'consumer2'): t = threading.Thread(name=name, target=consumer, args=(event, l)) t.start() threads.append(t)p = threading.Thread(name='producer1', target=producer, args=(event, l))p.start()threads.append(p)for t in threads: t.join() |
執行的效果是這樣的:
123456789101112 | 77 appended to list by producer177 popped from list by consumer146 appended to list by producer146 popped from list by consumer243 appended to list by producer143 popped from list by consumer237 appended to list by producer137 popped from list by consumer233 appended to list by producer133 popped from list by consumer257 appended to list by producer157 popped from list by consumer1 |
可以看到事件被2個消費者比較平均的接收並處理了。如果使用了wait方法,執行緒就會等待我們設定事件,這也有助於保證任務的完成。
6. Queue
佇列在併發開發中最常用的。我們藉助「生產者/消費者」模式來理解:生產者把生產的「訊息」放入佇列,消費者從這個佇列中對去對應的訊息執行。
大家主要關心如下4個方法就好了:
- put: 向佇列中新增一個項。
- get: 從佇列中刪除並返回一個項。
- task_done: 當某一項任務完成時呼叫。
- join: 阻塞直到所有的專案都被處理完。
123456789101112131415161718192021222324252627282930 | # coding=utf-8import timeimport threadingfrom random import randomfrom Queue import Queueq = Queue()def double(n): return n * 2def producer(): while 1: wt = random() time.sleep(wt) q.put((double, wt))def consumer(): while 1: task, arg = q.get() print arg, task(arg) q.task_done()for target in(producer, consumer): t = threading.Thread(target=target) t.start() |
這就是最簡化的佇列架構。
Queue模組還自帶了PriorityQueue(帶有優先順序)和LifoQueue(後進先出)2種特殊佇列。我們這裡展示下執行緒安全的優先順序佇列的用法,
PriorityQueue要求我們put的資料的格式是(priority_number,
data)
,我們看看下面的例子:
123456789101112131415161718192021222324252627282930313233343536373839 | import timeimport threadingfrom random import randintfrom Queue import PriorityQueueq = PriorityQueue()def double(n): return n * 2def producer(): count = 0 while 1: if count > 5: break pri = randint(0, 100) print 'put :{}'.format(pri) q.put((pri, double, pri)) # (priority, func, args) count += 1def consumer(): while 1: if q.empty(): break pri, task, arg = q.get() print '[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)) q.task_done() time.sleep(0.1)t = threading.Thread(target=producer)t.start()time.sleep(1)t = threading.Thread(target=consumer)t.start() |
其中消費者是故意讓它執行的比生產者慢很多,為了節省篇幅,只隨機產生5次隨機結果。我們看下執行的效果:
1234567891011121314 | ❯ python priority_queue.pyput :84put :86put :16put :93put :14put :93[PRI:14] 14 * 2 = 28[PRI:16] 16 * 2 = 32[PRI:84] 84 * 2 = 168[PRI:86] 86 * 2 = 172[PRI:93] 93 * 2 = 186[PRI:93] 93 * 2 = 186 |
可以看到put時的數字是隨機的,但是get的時候先從優先順序更高(數字小表示優先順序高)開始獲取的。
執行緒池
面向物件開發中,大家知道建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。無節制的建立和銷燬執行緒是一種極大的浪費。那我們可不可以把執行完任務的執行緒不銷燬而重複利用呢?彷彿就是把這些執行緒放進一個池子,一方面我們可以控制同時工作的執行緒數量,一方面也避免了建立和銷燬產生的開銷。
執行緒池在標準庫中其實是有體現的,只是在官方文章中基本沒有被提及:
1234 | In : from multiprocessing.pool import ThreadPoolIn : pool = ThreadPool(5)In : pool.map(lambda x: x**2, range(5))Out: [0, 1, 4, 9, 16] |
當然我們也可以自己實現一個: