效能測試學習
https://zhuanlan.zhihu.com/p/490353142
1. 程序vs執行緒
程序(process)指的是正在執行的程式的例項,即an instance of a computer that is being executed。用拆字法理解就是:進行中的程式。程式是一個沒有生命的實體,只有處理器執行它的時候才能成為一個活動的實體,稱之為程序。[1]
執行緒(thread)包含於程序之中,是作業系統能夠進行運算排程的最小單元。一條程序中可以併發多個執行緒,而同一條執行緒將共享該程序中的全部系統資源。[2]
每個程序都有自己獨立的地址空間、記憶體和資料棧,因此程序之間通訊不方便,要是用程序間通訊(InterProcess Communication, IPC)。而同一個程序中的執行緒共享資源,因此執行緒間通訊非常方便,但要注意資料同步與互斥的問題。
雖然一條程序中可以併發多個執行緒,但是對於單核CPU而言,同一時間CPU只能執行一個執行緒。
2. thread vs threading
Python處理執行緒的模組有兩個:thread和threading。Python 3已經停用了thread模組[3],並改名為_thread模組。Python 3在_thread模組的基礎上開發了更高階的threading模組,因此以下的講解都是基於threading模組。
3. 如何建立一個執行緒?
根據threading底層程式碼的說明,建立一個執行緒通常有兩種方法:(1)在例項化一個執行緒物件時,將要執行的任務函式以引數的形式傳入;(2)繼承Thread類的同時重寫它的run方法。
threading.Class類
現在我準備建立兩個執行緒,一個執行緒每隔一秒列印一個“1”,另一個執行緒每隔2秒列印一個“2”,如何建立並執行呢?兩種方法如下:
3.1 方法一
import time
import threading
def printNumber(n: int) -> None:
while True:
print(n)
time.sleep(n)
for i in range(1, 3):
t = threading.Thread(target=printNumber, args=(i, ))
t.start()
執行結果如下,控制檯會不停地、交錯地列印“1”和“2”:
執行結果
3.2 方法二
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
# 注意:一定要呼叫父類的初始化函式,否則否發建立執行緒
super().__init__()
def run(self) -> None:
while True:
print(self.n)
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
執行結果如下,控制檯會不停地、交錯地列印“1”和“2”:
執行結果
4. 主執行緒和子執行緒
我們先把上述的程式碼簡單做一下修改,讓它在列印的同時列印活躍的執行緒個數,程式碼如下
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
print(self.n, f"當前活躍的執行緒個數:{_count}")
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
該程式碼執行結果如下:
這裡活躍的執行緒個數怎麼理解?
好那麼問題來了:當我建立了執行緒1並開始執行的時候,程式卻告訴我有2個活躍的執行緒呢?同樣地,我最終只建立了2個執行緒,為什麼程式卻告訴我有3個活躍的執行緒呢?
讓我們回到程序和執行緒的定義,當我們開始執行這個程式的時候,這個程式成為一個“有生命的”程序,程序至少有一個執行緒,這個執行緒就是主執行緒。當程式執行到第一次t.start()的時候,程式建立了一個子執行緒,此時活躍的執行緒個數是2。進一步,當執行第二次t.start()的時候,程式又建立了一個子執行緒,因此最終活躍的執行緒個數是3。
注意每個程序只有一個主執行緒。
5. 守護執行緒(Daemon Thread)
守護執行緒(Daemon Thread)也叫後臺程序,它的目的是為其他執行緒提供服務。如果其他執行緒被殺死了,那麼守護執行緒也就沒有了存在的必要。因此守護執行緒會隨著非守護執行緒的消亡而消亡。Thread類中,子執行緒被建立時預設是非守護執行緒,我們可以通過setDaemon(True)將一個子執行緒設定為守護執行緒。
我們把上面這個例子中建立的兩個子執行緒改寫為守護執行緒,看看會發生什麼:
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
print(self.n, f"當前活躍的執行緒個數:{_count}")
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.setDaemon(True)
t.start()
print("結束!")
執行結果如下:
和前面不同,程式列印完“結束!”徹底結束了
和前面完全不同的是:程式列印完“結束!”後就徹底結束了,不再列印任何內容。這是為什麼呢?
因為當程式執行完print("結束!")以後,主執行緒就可以結束了,這時候被設定為守護執行緒的兩個子執行緒會被殺死,然後主執行緒結束。
現在,如果我把兩個子執行緒的其中一個設定為守護執行緒,另一個設定為非守護執行緒,會怎樣呢?程式碼如下:
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
print(self.n, f"當前活躍的執行緒個數:{_count}")
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
if i == 1:
t.setDaemon(True) # 將其中一個執行緒設定為守護執行緒
t.start()
print("結束!")
你可能會想,守護執行緒會被殺死,非守護執行緒繼續執行。但實際情況並非如此,結果如下:
兩個子執行緒都在繼續執行
這是因為非守護執行緒作為前臺程式還在繼續執行,守護執行緒就還有“守護”的意義,就會繼續執行。
需要注意的是:將子執行緒設定為守護執行緒必須在呼叫start()方法之前,否則回引發RuntimeError異常。
6. join()方法
join()會使主執行緒進入等待狀態(阻塞),直到呼叫join()方法的子執行緒執行結束。同時你也可以通過設定timeout引數來設定等待的時間,如:
import time
import threading
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
while True:
_count = threading.active_count()
print(f"執行緒-{self.n}", f"當前活躍的執行緒個數:{_count}")
time.sleep(self.n)
for i in range(1, 3):
t = MyThread(i)
t.start()
t.join(3)
執行結果如下:
通過join()方法實現了主執行緒阻塞
7. 資料安全與執行緒鎖
現在假設你建立了兩個子執行緒操作同一個全域性變數number,number被初始化為0,兩個子執行緒通過for迴圈對這個number進行+1,每個子執行緒迴圈1000000次,兩個子執行緒同時進行。如果一切正常的話,最終這個number會變成2000000,然而現實並非如此。程式碼如下
import time
import threading
number = 0
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
global number
for i in range(1000000):
number += 1
for i in range(1, 3):
t = MyThread(i)
t.start()
# 給5秒鐘讓兩個子執行緒執行完畢
time.sleep(5)
# 確保兩個子執行緒執行完畢
print("活躍的執行緒個數:", threading.active_count())
# 輸出最終數值
print("number: ", number)
執行結果如下:
結果並不是2000000
這種情況稱為“髒資料”。產生髒資料的原因是,當一個執行緒在對資料進行修改時,修改到一半時另一個執行緒讀取了未經修改的資料並進行修改。如何避免髒資料的產生呢?一個辦法就是用join方法,即先讓一個執行緒執行完畢再執行另一個執行緒。但這樣的本質是把多執行緒變成了單執行緒,失去了多執行緒的意義。另一個辦法就是用執行緒鎖,threading模組中有如下幾種執行緒鎖[4]:
7.1 Lock互斥鎖
import time
import threading
number = 0
lock = threading.Lock() # 例項化一個鎖
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
global number
for i in range(1000000):
lock.acquire() # 開鎖,只允許當前執行緒訪問共享的資料
number += 1
lock.release() # 釋放鎖,允許其他執行緒訪問共享資料
for i in range(1, 3):
t = MyThread(i)
t.start()
# 給5秒鐘讓兩個子執行緒執行完畢
time.sleep(5)
# 確保兩個子執行緒執行完畢
print("活躍的執行緒個數:", threading.active_count())
# 輸出最終數值
print("number: ", number)
執行結果如下:
輸出正常
7.2 RLock
RLock和Lock的用法相同,區別在於:Lock只能開一次然後釋放一次,不能開多次,而RLock可以開多次,再進行多次釋放[5]。當然需要注意的是:RLock中雖然可以開多次,但是acquire和release的次數必須對應。
7.3 Semaphore
BoundedSemaphore類可以設定同一時間更改資料的執行緒個數
import time
import threading
semaphore = threading.BoundedSemaphore(3)
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
semaphore.acquire()
for i in range(100):
_count = threading.active_count() - 1
print(f"執行緒-{self.n}", f"當前活躍的子執行緒個數:{_count}")
time.sleep(1)
semaphore.release()
for i in range(1, 10):
t = MyThread(i)
t.start()
執行結果如下:
Semaphore設定了同時執行的執行緒的個數
7.4 Event
Event類會在全域性定義一個Flag,當Flag=False時,呼叫wait()方法會阻塞所有執行緒;而當Flag=True時,呼叫wait()方法不再阻塞。形象的比喻就是“紅綠燈”:在紅燈時阻塞所有執行緒,而在綠燈時又會一次性放行所有排隊中的執行緒。Event類有四個方法:
- set():將Flag設定為True
- wait():等待
- clear():將Flag設定為False
- is_set():返回bool值,判斷Flag是否為True
Event的一個好處是:可以實現執行緒間通訊,通過一個執行緒去控制另一個執行緒。
import time
import threading
event = threading.Event()
event.set() # 設定Flag = True
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
if self.n in [3, 4]:
event.clear() # 設定Flag = False
event.wait() # 執行緒3和4進入等待
for i in range(2):
_count = threading.active_count() - 1
print(f"執行緒-{self.n}", f"當前活躍的子執行緒個數:{_count}")
time.sleep(2)
if self.n == 2 and i == 1:
# 通過執行緒2來控制執行緒3和4
event.set()
for i in range(1, 5):
t = MyThread(i)
t.start()
執行結果如下:
通過一個執行緒去控制另一個執行緒
8. 一些小技巧
8.1 with上下門管理器
在使用Lock和RLock時,正確的開鎖-釋放鎖非常重要。通過with上下文管理器,可以保證執行緒鎖被正確釋放,而且程式碼也更加簡潔。如:
import time
import threading
number = 0
lock = threading.Lock() # 例項化一個鎖
class MyThread(threading.Thread):
def __init__(self, n):
self.n = n
super().__init__()
def run(self) -> None:
global number
for i in range(1000000):
with lock: # with上下文管理器
number += 1
for i in range(1, 3):
t = MyThread(i)
t.start()
# 給5秒鐘讓兩個子執行緒執行完畢
time.sleep(5)
# 確保兩個子執行緒執行完畢
print("活躍的執行緒個數:", threading.active_count())
# 輸出最終數值
print("number: ", number)
8.2 Timer計時器
通過threading.Timer類可以實現n秒後執行某操作。注意一個timer物件相當於一個新的子執行緒。
for i in range(1, 5):
t = MyThread(i)
if i == 4:
timer = Timer(0.1, t.start) # 5秒後再開始執行緒4
timer.start()
else:
t.start()
參考
- ^https://baike.baidu.com/item/%E8%BF%9B%E7%A8%8B/382503#:~:text=%E8%BF%9B%E7%A8%8B%EF%BC%88Process%EF%BC%89%E6%98%AF%E8%AE%A1%E7%AE%97%E6%9C%BA%E4%B8%AD,%E8%BF%9B%E7%A8%8B%E6%98%AF%E7%A8%8B%E5%BA%8F%E7%9A%84%E5%AE%9E%E4%BD%93%E3%80%82
- ^https://baike.baidu.com/item/%E7%BA%BF%E7%A8%8B/103101
- ^https://peps.python.org/pep-3108/#obsolete
- ^https://www.liujiangblog.com/course/python/79
- ^https://stackoverflow.com/questions/22885775/what-is-the-difference-between-lock-and-rlock