Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊
Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL
Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊
Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒
一、python多執行緒
對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高。這裡的I/O包括網路I/O和檔案I/O
1、例項
假如利用socket傳送http請求,也就是網路I/O。爬取列表網頁中的寫href連結,然後獲取href連結之後,在爬去連結的網頁詳情。
如果不適用多執行緒的話,程式序列的執行,結果就是要先等待列表網頁獲取所有的href的連結之後,才可以逐個的爬去href連結所指的網頁詳情,這就使得等待時間很長。
如果使用多執行緒程式設計,執行緒A執行第一個列表網頁程式,遇到I/O操作,GIL釋放,當獲取到第一個href連結之後,執行緒B就自動的去獲取href連結所指的網頁詳情。
2、多執行緒實現
使用sleep模擬網路I/IO
# test3.py import time import threading def get_detail_html(url): print("get detail html started") time.sleep(2) print("get detail html end") def get_detail_url(url): print("get detail url started") time.sleep(4) print("get detail url end") if __name__ == "__main__": # 函式方法 arg 為函式引數 thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() # 子執行緒1,2開始 thread1.start() thread2.start() print ("last time: {}".format(time.time()-start_time)) # 執行結果 get detail html started get detail url started last time: 0.0019958019256591797 # 忽略為0 get detail html end get detail url end
按照上面執行緒並行執行的邏輯應該是列印時間為2秒,但是結果卻為0。
任何程序預設就會啟動一個執行緒,該執行緒稱為主執行緒,主執行緒又可以啟動新的執行緒。上面的thread1與thread2就是主執行緒啟動的兩個新的執行緒,那麼在兩個子執行緒啟動之後,主執行緒中其餘的程式段print函式也在並行執行,所以時間為0。當兩個子執行緒執行完畢之後,主執行緒退出,程序關閉,程式執行結束。才會打印出get detail html end,get detail url end。
3、守護執行緒
那麼如何使得主執行緒退出的時候子執行緒也退出。或者說,主執行緒推出的時候kill掉子執行緒?
<1>、將子執行緒設定成守護執行緒
# test4.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
thread1.setDaemon(True)
thread2.setDaemon(True)
# 將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個子執行緒也退出,kill
start_time = time.time()
# 子程開始
thread1.start()
thread2.start()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
last time: 0.0
將兩個執行緒設定為守護執行緒,即主執行緒退出,這兩個守護執行緒也退出。列印結果中執行到print之後直接程式結束。
由於兩個執行緒的時間不相同,那麼兩者有什麼區別呢
<2>、先將thread1設定為守護執行緒
# test5.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
thread1.setDaemon(True) # 只將thread設定為守護執行緒
# thread2.setDaemon(True)
start_time = time.time()
thread1.start()
thread2.start()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 結果
get detail html started
get detail url started
last time: 0.000997781753540039
get detail html end
get detail url end
只將thread1設定為守護執行緒之後,由於thread2的sleep時間為4秒,所以主執行緒仍會等待thread2執行結束之後才退出,而thread1由於時間為2秒,所以也會列印。
<3>、先將thread2設定為守護執行緒
# test6.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
# thread1.setDaemon(True)
thread2.setDaemon(True)
start_time = time.time()
thread1.start()
thread2.start()
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
last time: 0.0029969215393066406
get detail html end
由於只將thread2設定為守護執行緒,print函式執行結束的時候會首先kill掉thread2執行緒。但是由於thread1執行緒還未結束,程式仍會等待兩秒輸出get detail html end才結束。
4、執行緒阻塞
上面說了如何在主執行緒結束的時候,直接kill掉子執行緒。那麼如何使子執行緒執行結束才執行主執行緒,就是阻塞主程序。
<1>、結束兩個子執行緒
# test7.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 子執行緒程結束
thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
#輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.001712799072266
由於呼叫了兩個thread的join方法,主執行緒阻塞,當子執行緒結束之後,print函式執行後主執行緒退出,程式結束。
<2>、結束thread1執行緒
# test8.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 1執行緒程結束
thread1.join()
# thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
get detail html end
last time: 2.001251220703125
get detail url end
由於呼叫了thread1的join方法,阻塞主執行緒,thread1直接結束之後print列印時間,但是對另一個執行緒沒有影響。所以在列印last time: 2.001251220703125時間,等待兩秒列印get detail url end,主執行緒才會退出。
<3>、結束thread2執行緒
# test9.py
import time
import threading
def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 函式方法 arg 為函式引數
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 2執行緒程結束
# thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
# 輸出
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.002287864685059
由於thread2執行緒的sleep的時間為4秒,期間thread1已經執行完畢,所以列印時間為4秒。
5、Thread類繼承式建立
同樣的也可以使用類繼承的方法建立執行緒例項,效果一樣的
# test10.py
import time
import threading
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
# 類繼承方法
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# 子執行緒開始
thread1.start()
thread2.start()
# 子執行緒程結束
thread1.join()
thread2.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
二、執行緒通訊
1、共享變數通訊
共享變數通訊,是執行緒間通訊最簡單的方式,但也是最容易出問題的方式。以上面爬去頁面和網頁連結的例項進行擴充套件。在上面的例項中,因為要解決請求列表頁面的時候網路時延問題,引入了多執行緒並行,邊爬去列表頁獲取href,再爬取href指向的想起那個頁面,下面將爬去的頁面存入列表實現。
# test11.py
import threading
import time
detail_url_list = [] # 儲存著爬取下來的href連結
def get_detail_html(detail_url_list): # 引數這裡作為對全域性變數的引用
while True:
# 使用while語句使得執行緒持續爬去
if len(detail_url_list):
url = detail_url_list.pop()
print('get detail html start')
time.sleep(2)
print('get detail html end')
def get_detail_url(detail_url_list):
while True:
# 使用while語句使得執行緒持續爬取
print('get detail url start')
time.sleep(4)
for i in range(20):
detail_url_list.append('http://www.xxxx.com/{}.html'.format(i))
print('get detail end')
if __name__ == "__main__":
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_list,))
for i in range(10):
# 為了模擬多個執行緒併發,這裡建立了十個子執行緒
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_list,))
html_thread.start()
start_time = time.time()
print("last time: {}".format(time.time() - start_time))
但是上面問題也會很明顯,在GIL的示例中,total變數由於變數共享的緣故,沒有按照預期的執行。而在上面的爬蟲例項中,detail_url_list作為全域性共享變數,pop操作,append操作,多個執行緒共用資源,都不是執行緒安全的操作,會出現問題。所以就必須給變數加上鎖,保持安全性。為了擺脫這種問題,使用訊息佇列通訊
2、訊息佇列通訊
訊息佇列通訊也就是使用Queue這個類來表示變數,從而達到執行緒安全,由於Queue這個類內部封裝了deque,也就是python中的雙端佇列。雙端對列本身就是安全界別很高的一種型別,實現執行緒間的安全操作。
# test12.py
#通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading
def get_detail_html(queue):
#爬取文章詳情頁
while True:
url = queue.get()
print("get detail html started")
time.sleep(2)
print("get detail html end")
def get_detail_url(queue):
# 爬取文章列表頁
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
queue.put("http://projectsedu.com/{id}".format(id=i))
print("get detail url end")
if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)
thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
start_time = time.time()
# detail_url_queue.task_done()
detail_url_queue.join()
#當主執行緒退出的時候, 子執行緒kill掉
print ("last time: {}".format(time.time()-start_time))
使用了訊息佇列替代共享變數
- Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。
- q.put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。沒有引數時,q.put的個數大於佇列數時,會一直阻塞住。
- q.get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。沒有引數時,q.get的個數大於佇列數時,會一直阻塞住。
- q.put_nowait()等價於q.put(block=False)佇列滿時再存也會拋異常
- q.get_nowait()等價於q.get(block=False)佇列為空取不出時會拋異常
- q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
- q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止
三、執行緒同步
1、加鎖
在上面的第一個GIL示例中,由於GIL釋放的緣故,多個執行緒共享變數,導致total的值不像預期那樣為0的問題發生,也就是如何執行緒同步。最簡單的方式就是加鎖。加鎖使得一個執行緒在佔用資源的時候,別的執行緒都必須等待,只有當這個執行緒主動釋放資源的時候,其他執行緒才能使用資源,也就是資源佔用互斥。這樣就可要保證共享變數的安全性。
# test13.py
from threading import Lock
#在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = Lock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire() # 加鎖
total += 1
lock.release() # 釋放鎖
def desc():
global total
global lock
for i in range(1000000):
lock.acquire() # 加鎖
total -= 1
lock.release() # 釋放鎖
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
# 輸出
在等待了一段時間後輸出0
0 # total的列印結果為0
加鎖的時候要保證加上鎖執行完成之後,就要釋放掉,不然會一直佔用資源。
加鎖的結果使得在執行total-=1或者total+=1的賦值語句的時候,該賦值語句對應的多條位元組碼指令執行完之後,才會其他程序執行修改total值。該執行緒佔用了鎖,所以其他執行緒不能修改total值,只有當該釋放了鎖,其他執行緒才能修改total值,不會造成修改共享變數的衝突。這是加鎖的好處,那麼代價也十分明顯
加鎖缺點:
- 加鎖效能
- 死鎖風險
補充:另外自己加的鎖使使用者級別的與GIL不同。
<1>、效能問題
本來的多執行緒,由於加鎖的緣故,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行。並且由於來回切換執行緒的緣故,程式效能變得低下
將test2.py改成如下
# test14.py
total = 0
def add():
global total
for i in range(1000000):
total += 1
def desc():
global total
for i in range(1000000):
total -= 1
import threading
import time
start_time = time.time()
add()
desc()
print(total)
print("last time: {}".format(time.time() - start_time))
# 輸出
0
last time: 0.314816951751709
這是簡單的單執行緒程式,持續時間為0.3秒。沒有使用thread多執行緒
下面使用threading多執行緒,並且加鎖
# test15.py
from threading import Lock
total = 0
lock = Lock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire()
total += 1
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
import time
start_time = time.time()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
print("last time: {}".format(time.time() - start_time))
# 輸出
0
last time: 5.062084674835205
使用了多執行緒,為了保證共享變數的安全性操作,執行緒同步,加鎖導致類似單執行緒,程式的執行時間達到了5秒鐘。可見執行緒之間的切換十分浪費時間。所以說,CPython的GIL本意是用來保護所有全域性的直譯器和環境狀態變數的,如果去掉GIL,就需要更多的更細粒度的鎖對直譯器的眾多全域性狀態進行保護。做過測試將GIL去掉,加入更細粒度的鎖。但是實踐檢測對單執行緒來說,效能更低。
<2>、死鎖風險
來看下面例子
這裡為了在一個執行緒中多次呼叫lock,使用可重入的鎖Rlock物件
Lock與Rlock區別:
RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣。
# test15.py
from threading import RLock # 可重入的鎖
total = 0
lock = RLock()
def add():
global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire() # 這裡加了兩次鎖
total += 1
lock.release()
def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
import threading
import time
start_time = time.time()
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
print("last time: {}".format(time.time() - start_time))
由於在add函式中加了兩次鎖lock.acquire(),結果就是執行緒永遠都不獲釋放掉共享變數。一直佔用資源,其他的執行緒請求資源沒有結果,多個執行緒掛起,既不能執行,也無法結束,一直處於等待狀態,造成死鎖,只能靠作業系統強制終止。最終程式也沒有任何結果輸出。
所以在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
還有就是,執行緒的相互等待,假如記憶體中又兩中資源a和b,而執行緒A(a,b)和執行緒B(a,b)都申請資源。
第一步
執行緒A先申請a資源,執行緒B先申請b資源,因此沒有問題
第二步
由於a,b均已被A,B佔用,並且A申請b,B申請b,在位獲得新的資源的時候兩者都不會退出對現有資源的佔用,這就造成了兩個執行緒相互等待,並且這種等待會一直持續下去,造成死鎖。
2、執行緒複雜通訊
在上面看到執行緒進行通訊的時候需要加鎖,如果如何使用鎖進行執行緒的對話功能,例如
- 執行緒A:hello,你好啊
- 執行緒B:你好
- 執行緒A:吃飯了嗎
- 執行緒B:吃過了,你呢
- 執行緒A:我也吃過了,咱們去搞PVM吧
- 執行緒B:ok,走吧
<1>、簡單鎖
像上面的執行緒通訊,如果使用簡單的Rlock鎖
import threading
class ThreadA(threading.Thread):
def __init__(self, lock):
super().__init__(name="執行緒A")
self.lock = lock
def run(self):
self.lock.acquire()
print("{} : hello, 你好 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 吃過飯了嗎 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
self.lock.release()
class ThreadB(threading.Thread):
def __init__(self, lock):
super().__init__(name="執行緒B")
self.lock = lock
def run(self):
self.lock.acquire()
print("{} : 你好 ".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : 吃過了,你呢".format(self.name))
self.lock.release()
self.lock.acquire()
print("{} : ok,走吧 ".format(self.name))
self.lock.release()
if __name__ == "__main__":
lock = threading.RLock()
a_thread = ThreadA(lock)
b_thread = ThreadB(lock)
a_thread.start()
b_thread.start()
# 輸出
執行緒A : hello, 你好
執行緒A : 吃過飯了嗎
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : 你好
執行緒B : 吃過了,你呢
執行緒B : ok,走吧
顯然沒有完成執行緒通訊的基本功能。
<2>、threading.Condition()
解決方案:線上程複雜通訊時使用threading.Condition(),可以把Condiftion理解為一把高階的瑣,它提供了比Lock, RLock更高階的功能,允許我們能夠控制複雜的執行緒同步問題。threadiong.Condition在內部維護一個瑣物件(預設是RLock),可以在建立Condigtion物件的時候把瑣物件作為引數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的呼叫內部瑣物件的對應的方法而已。Condition還提供wait方法、notify方法、notifyAll方法。這些方法只有在佔用瑣(acquire)之後才能呼叫,否則將會報RuntimeError異常。
方法介紹:
- acquire()/release():獲得/釋放 Lock
- wait([timeout]):執行緒掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續執行。wait()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。呼叫wait()會釋放Lock,直至該執行緒被Notify()、NotifyAll()或者超時執行緒又重新獲得Lock.
- notify(n=1):通知其他執行緒,那些掛起的執行緒接到這個通知之後會開始執行,預設是通知一個正等待該condition的執行緒,最多則喚醒n個等待的執行緒。notify()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。notify()不會主動釋放Lock。
- notifyAll(): 如果wait狀態執行緒比較多,notifyAll的作用就是通知所有執行緒
原始碼分析:
# 部分原始碼
_PyRLock = _RLock
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
進入Condition這個類中檢視原始碼發現,在預設的情況下,Condition是封裝的鎖物件是Rlock,另外Condition類實現了__enter__,__exit__兩個特殊方法,由鴨子型別可知,說明可以像上下文管理器一樣使用它。
而在__enter__與__exit__兩個特殊方法中分別呼叫了self.acquire()與self.release()兩個方法,所以說不使用with上下文管理器的話也可以直接使用acquire()與release()兩個方法進行加鎖釋放鎖。
解決例項:
class ThreadA(threading.Thread):
def __init__(self, cond):
super().__init__(name="執行緒A")
self.cond = cond
def run(self):
with self.cond:
print("{} : hello, 你好 ".format(self.name)) # 4
self.cond.notify() # 5
self.cond.wait() # 6
print("{} : 吃過飯了嗎 ".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : 我也吃過了,咱們去找PVM吧".format(self.name))
self.cond.notify()
self.cond.wait()
class ThreadB(threading.Thread):
def __init__(self, cond):
super().__init__(name="執行緒B")
self.cond = cond
def run(self):
with self.cond:
self.cond.wait() # 2
print("{} : 你好 ".format(self.name)) # 7
self.cond.notify()
self.cond.wait()
print("{} : 吃過了,你呢".format(self.name))
self.cond.notify()
self.cond.wait()
print("{} : ok,走吧 ".format(self.name))
self.cond.notify()
if __name__ == "__main__":
cond = threading.Condition()
b_thread = ThreadB(cond)
a_thread = ThreadA(cond)
b_thread.start() # 1
a_thread.start() # 3
# 輸出結果
執行緒A : hello, 你好
執行緒B : 你好
執行緒A : 吃過飯了嗎
執行緒B : 吃過了,你呢
執行緒A : 我也吃過了,咱們去找PVM吧
執行緒B : ok,走吧
完成執行緒之間的複雜通訊。
這裡需要注意的是:兩個執行緒之間的開啟先後順序。b執行緒需要先於a執行緒開啟。原因:
1 先開啟b執行緒
2 wait方法會首先上一把鎖,執行緒處於阻塞態
3 開啟a執行緒
4 列印 執行緒A:hello,你好啊
5 這個時候cond物件呼叫notify方法,會釋放掉之前上的鎖
6 呼叫wait方法,為自己又上了一把鎖
7 由於notify方法已經打開了鎖,或繼續執行,列印 執行緒B:你好
其實wait方法會維持一個鎖,而這個鎖只有notify方法才能開啟。如果a執行緒先開啟,則是呼叫了wait方法維持了一把鎖,並沒有其他的執行緒會呼叫notify方法釋放這把鎖。則最終只會輸出 執行緒A : hello, 你好 ,而執行緒一直處於死鎖狀態。
補充:Condition物件會維持兩層鎖,而不是兩個鎖,更不是簡單的一個鎖。在開啟或者關閉上下文管理器物件的時候__enter__,__exit__方法會開啟釋放掉底層鎖(直接使用acquire()與release()兩個方法也行),這一層鎖是一個。而在持續連續呼叫的wait和notify方法則是對第二層鎖進行操作,而這一層所在Condition物件內部是封裝到一個雙端佇列中,在每次呼叫wait的時候分配一把鎖並放入到cond的等待佇列中,等到notify方法的喚醒。可以進入Condition原始碼檢視
3、Semaphore(訊號量)
同時只有n個執行緒可以獲得semaphore,即可以限制最大連線數為n),也就是執行緒最大併發量的控制。
Semaphore管理一個內建的計數器,每當呼叫acquire()時內建計數器-1;呼叫release() 時內建計數器+1;計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。
訊號量使得一個程式中有很多個執行緒,但是隻有n多個執行緒獲得訊號量,處於執行態
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
time.sleep(2)
print("got html text success, time is {}".format(time.ctime()))
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()
if __name__ == "__main__":
sem = threading.Semaphore(4) # 每次只有4個執行緒獲取訊號量
url_producer = UrlProducer(sem)
url_producer.start()
在上面示例中,模擬爬蟲,建立20個子執行緒爬取html頁面,如果不是用訊號量,二十條資料一次返回。使用訊號量,使得每次只有4個執行緒執行。
# 輸出結果
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:55 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:57 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:17:59 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:01 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
got html text success, time is Tue Nov 20 17:18:03 2018
每個兩秒列印一次結果,一次四條資料。總共二十個。
4、執行緒池
Python標準庫為我們提供了threading和multiprocessing模組編寫相應的多執行緒/多程序程式碼,但是當專案達到一定的規模,頻繁建立/銷燬程序或者執行緒是非常消耗資源的,這個時候我們就要編寫自己的執行緒池/程序池,以空間換時間。但從Python3.2開始,標準庫為我們提供了concurrent.futures模組,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫執行緒池/程序池提供了直接的支援。
concurrent.futures模組的基礎是Exectuor,Executor是一個抽象類,它不能被直接使用。但是它提供的兩個子類ThreadPoolExecutor和ProcessPoolExecutor卻是非常有用,顧名思義兩者分別被用來建立執行緒池和程序池的程式碼。我們可以將相應的tasks直接放入執行緒池/程序池,不需要維護Queue來操心死鎖的問題,執行緒池/程序池會自動幫我們排程。
Future你可以把它理解為一個在未來完成的操作,這是非同步程式設計的基礎,傳統程式設計模式下比如我們操作queue.get的時候,在等待返回結果之前會產生阻塞,cpu不能讓出來做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。
<1>、使用submit來操作執行緒池/程序池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
for url in URLS:
future = executor.submit(load_url,url)
print(future.done())
print('主執行緒')
# 執行結果:
False
False
False
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes
根據執行結果,使用submit方法來往執行緒池中加入一個task,submit返回一個Future物件,對於Future物件可以簡單地理解為一個在未來完成的操作。由於執行緒池非同步提交了任務,主執行緒並不會等待執行緒池裡建立的執行緒執行完畢,所以執行了print('主執行緒'),相應的執行緒池中建立的執行緒並沒有執行完畢,故future.done()返回結果為False。
<2>、 用map來操作執行緒池/程序池:
from concurrent.futures import ThreadPoolExecutor
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url,URLS)
print('主執行緒')
# 結果
主執行緒
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75633 bytes
'http://www.163.com' page is 703974 bytes
從執行結果可以看出,map是按照URLS列表元素的順序返回的,並且寫出的程式碼更加簡潔直觀,可以根據具體的需求任選一種。
<3>、wait
wait方法接會返回一個tuple(元組),tuple中包含兩個set(集合),一個是completed(已完成的)另外一個是uncompleted(未完成的)。使用wait方法的一個優勢就是獲得更大的自由度,它接收三個引數FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,預設設定為ALL_COMPLETED
如果採用預設的ALL_COMPLETED,程式會阻塞直到執行緒池裡面的所有任務都完成,再執行主執行緒:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list))
print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
'https://github.com/' page is 75627 bytes
'http://www.163.com' page is 703988 bytes
DoneAndNotDoneFutures(done={<Future at 0x2ab6ea89d30 state=finished returned NoneType>, <Future at 0x2ab6ea89240 state=finished returned NoneType>, <Future at 0x2ab6e93f7b8 state=finished returned NoneType>}, not_done=set())
主執行緒
如果採用FIRST_COMPLETED引數,程式並不會等到執行緒池裡面所有的任務都完成:
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import urllib.request
URLS = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
with urllib.request.urlopen(url, timeout=60) as conn:
print('%r page is %d bytes' % (url, len(conn.read())))
executor = ThreadPoolExecutor(max_workers=3)
f_list = []
for url in URLS:
future = executor.submit(load_url,url)
f_list.append(future)
print(wait(f_list,return_when='FIRST_COMPLETED'))
print('主執行緒')
# 輸出
'https://www.baidu.com/' page is 227 bytes
DoneAndNotDoneFutures(done={<Future at 0x2cd5581a240 state=finished returned NoneType>}, not_done={<Future at 0x2cd5581ad30 state=running>, <Future at 0x2cd556cf7f0 state=running>})
主執行緒
'http://www.163.com' page is 703991 bytes
'https://github.com/' page is 75625 bytes
<4>、回撥函式
import requests
import time
from concurrent.futures import ThreadPoolExecutor
def get(url):
print('GET {}'.format(url))
response = requests.get(url)
time.sleep(2)
if response.status_code == 200: # 200代表狀態:下載成功了
return {'url': url, 'content': response.text}
def parse(res):
print('%s parse res is %s' % (res['url'], len(res['content'])))
return '%s parse res is %s' % (res['url'], len(res['content']))
def save(res):
print('save', res)
def task(res):
res = res.result()
par_res = parse(res)
save(par_res)
if __name__ == '__main__':
urls = [
'http://www.cnblogs.com',
'https://www.python.org',
'https://www.openstack.org',
]
pool = ThreadPoolExecutor(2)
for i in urls:
pool.submit(get, i).add_done_callback(task)
'''
這裡的回撥函式拿到的是一個物件。得
先把返回的res得到一個結果。即在前面加上一個res.result()
誰好了誰去掉回撥函式
回撥函式也是一種程式設計思想。不僅線上程池用,在程序池也用
'''
pool.shutdown() # 相當於程序池裡的close和join
# 輸出
GET http://www.cnblogs.com
GET https://www.python.org
https://www.python.org parse res is 50114
save https://www.python.org parse res is 50114
GET https://www.openstack.org
https://www.openstack.org parse res is 63253
save https://www.openstack.org parse res is 63253
http://www.cnblogs.com parse res is 40382
save http://www.cnblogs.com parse res is 40382