最多開啟多少個執行緒_[多執行緒]python threading初窺
技術標籤:最多開啟多少個執行緒
因為工作需要,因此係統學習一下python的多執行緒。
環境:
python3.5.2
正文
當我們每次啟動一個程式時,我們都會啟動一個MainThread,在這個執行緒中程式碼是順序執行的,但是當我們做一些IO密集型的工作,如下載檔案時,如果後續操作暫時不依賴於下載的檔案,那麼期望能夠讓cpu繼續去執行接下來的程式碼,而不是一直等待,直到下載執行完成。因此可以用到多執行緒。
多執行緒的使用流程如下:
- 編寫需要線上程中執行的邏輯函式target()
- new一個子執行緒,並將target賦值給該子執行緒,target就是想要在子執行緒中執行的函式
- 在主執行緒中啟動該子執行緒
樣例:
# coding:utf-8 import threading import time def func(i): time.sleep(1) print("thread: {}, idx:{}".format(threading.current_thread(),i)) def run(): for i in range(5): t = threading.Thread(target=func, args=(i, )) t.start() if __name__ == "__main__": run()
在多執行緒的實際使用中,我們的需求並非唯一的。有時候我們希望所有子執行緒執行完之後再去執行主執行緒,有時候我們希望啟動子執行緒後能夠同步執行主執行緒,有時候我們需要順序執行部分子執行緒,再同時跑所有執行緒。因此掌握thread中幾個重要的類函式是必要的。
函式
- start()
啟動執行緒,一個thread只能呼叫一次。當使用start()的時候,會在開啟一個新的子執行緒中呼叫該執行緒的run()
2. run()
標準的執行緒run會執行在初始化執行緒時傳入的target函式。而自定義Thread時,需要override這個函式。
start和run的區別在於,只有使用start,才能真正做多執行緒操作。如果在主執行緒中直接對某個執行緒呼叫run(), 那麼實際上相當於在主執行緒呼叫這個子執行緒的run函式,而並沒有啟動該執行緒。
樣例:
# coding:utf-8
import threading
import time
def output(f):
def wrapper(*args):
print("{} start!".format(f.__name__))
f(*args)
print("{} is over!".format(f.__name__))
return wrapper
@output
def func(i):
time.sleep(1)
print("thread: {}, idx:{}".format(threading.current_thread(),i))
@output
def run():
for i in range(5):
t = threading.Thread(target=func, args=(i, ))
t.run()
# t.start()
# t.join()
if __name__ == "__main__":
run()
如果在迴圈中呼叫t.join(),則主執行緒會開啟五個子執行緒,並且在func中列印當前執行緒的時候,列印的是各個子執行緒。
如果在迴圈中呼叫t.run(),那麼5個thread會依次執行,並且在func中列印當前執行緒的時候,列印的全是主執行緒。
3. join()
用於阻塞主執行緒,直到等待當前子執行緒跑完,才會繼續執行主執行緒。值得注意的是,在迴圈中當先用start()開啟執行緒,然後用join()阻塞了主執行緒,此時run()是在各個子執行緒而不是在主執行緒中執行。儘管在執行上和使用for i ... n;run()的效果一致。
4. is_alive()
當且僅當指定的執行緒在執行中時,返回True。
Lock
在多執行緒中,執行緒間是共享記憶體的,因此當多個執行緒同時對某一個變數進行讀寫時,它們操作的都是同一塊記憶體區域。但是由於排程的問題,無法確保執行緒的執行順序如程式碼的邏輯順序一樣,因此需要通過加鎖的方式來約束執行緒的執行順序。
# coding:utf-8
import threading
num = 0
lock = threading.Lock()
def func(idx):
global num
for i in range(1000000):
# with lock:
num += idx
def run():
t1 = threading.Thread(target=func, args=(1, ))
t2 = threading.Thread(target=func, args=(-1,))
t1.start()
t2.start()
t1.join()
t2.join()
print("run is over! num is ", num)
if __name__ == "__main__":
run()
上面的code中,t1 和 t2 分別對全域性變數 num 操作,如果不使用lock,那麼最後 num 的結果不一定是0,因為 num+=idx 實際上是兩條指令,t = num + idx 和 num = t,所以可能當t1執行了 t = num + idx 後,t2正好執行了 num = t,相當於t1的操作無效化了。(這裡其實根據操作的物件屬於可變物件和不可變物件,結果還有差異。文末會在補充1中詳細展開)
為了解決共享記憶體的問題,需要通過鎖的方式來防止多個執行緒同時讀寫同一塊記憶體,這裡可以通過threading.Lock()的方式來處理。用法如上例中註釋的with lock,其等價於在計算前先呼叫了 lock.acquire(),而在計算結束後呼叫了lock.release()。
然而Lock在無形中限制了某一個執行緒多次讀寫同一塊記憶體的能力。如下例:
# coding:utf-8
import threading
num = 0
lock = threading.Lock()
def add(idx):
global num
with lock:
num += idx
def sub(idx):
global num
with lock:
num -= idx
def cal(idx):
global num
with lock:
for i in range(1000):
add(idx)
for i in range(1000):
sub(idx)
def run():
t1 = threading.Thread(target=cal, args=(2,))
t2 = threading.Thread(target=cal, args=(3,))
t1.start()
t2.start()
t1.join()
t2.join()
print("run is over! num is ", num)
if __name__ == "__main__":
run()
在這種情況下,當某一個執行緒在cal函式中已經取得了鎖,然而在add或sub時需要進一步取鎖時既不能取到鎖,又不能將之前的鎖釋放掉,因此出現了死鎖(死鎖的概念會在文末補充2中介紹)的現象。當然,通過設計好的流程可以規避這一點,但是對於更大更復雜的專案來說,出現這樣的情況難以避免。因此,可以通過RLock(可重入鎖)來放鬆鎖對同一執行緒的限制。
RLock
RLock即允許同一個執行緒反覆申請某一個鎖,但是申請了多少次,就必須要釋放多少次後,鎖住的記憶體才能允許其他執行緒讀寫,除此之外沒有其他區別。
# coding:utf-8
import threading
num = 0
lock = threading.RLock()
def add(idx):
global num
with lock:
num += idx
def sub(idx):
global num
with lock:
num -= idx
def cal(idx):
global num
with lock:
for i in range(1000):
add(idx)
for i in range(1000):
sub(idx)
def run():
t1 = threading.Thread(target=cal, args=(2,))
t2 = threading.Thread(target=cal, args=(3,))
t1.start()
t2.start()
t1.join()
t2.join()
print("run is over! num is ", num)
if __name__ == "__main__":
run()
Semaphore(訊號量)
Lock or RLock,其目的都是為了防止多個執行緒同時修改同一塊記憶體。但是如果所有執行緒只是去讀同一塊記憶體,那麼Lock/RLock就無用武之地了。在實際場景中,我們可能會遇到需要限制同時讀取資源的執行緒數量的需求,比如爬取資料時,需要限制爬取資料的執行緒的併發數。這裡訊號量則派上了用場。可以通過threading.Semaphore()來例項化一個訊號量,傳入的引數表示最多允許同時讀取的thread數量。在下面程式碼中我們開啟了4個執行緒,而訊號量的設定為3。
# coding:utf-8
import threading
import time
num = 12
semaphore = threading.Semaphore(3)
def consum():
global num
with semaphore:
while num:
print("current_thread: {}".format(threading.current_thread()))
num -= 1
time.sleep(2)
def run():
thread_list = [
threading.Thread(target=consum)
for _ in range(4)
]
for t in thread_list:
t.start()
for t in thread_list:
t.join()
print("run is over, num: {}".format(num))
if __name__ == "__main__":
run()
在程式執行時,我們可以看到列印結果是三個三個一出現(不一定是順序的),如下。
current_thread: <Thread(Thread-1, started 140288911263488)>
current_thread: <Thread(Thread-2, started 140288902870784)>
current_thread: <Thread(Thread-3, started 140288894478080)>
這裡訊號量的工作原理較為簡單,訊號量中有一個value,即允許併發的最大執行緒數。每當一個執行緒想要獲取共享資源的時候,首先會檢查當前value的值, 若value <=0 時則會掛起執行緒,等待已佔據共享資源的執行緒釋放。若value>0,則執行緒獲取共享資源,並通過acquire方法對value - 1。而當一個執行緒釋放共享資源時,通過release方法對value + 1。那麼特殊情況就是當value初始值為1,表示同一時間只允許一個執行緒去讀資料,這樣的訊號量又被稱為互斥量。
除了限制併發的執行緒數量,訊號量也可用於控制資料的供取,即常見的生產者-消費者模型。
# coding:utf-8
import threading
import time
import random
from queue import Queue
max_thread = 3
semaphore = threading.Semaphore(max_thread)
q = Queue(maxsize=10)
class Consumer(threading.Thread):
def run(self):
while True:
time.sleep(random.random())
semaphore.acquire()
if q.empty():
print("current_thread: {}, queue is empty!".format(
threading.current_thread()))
else:
print("current_thread: {}, consum: {}".format(
threading.current_thread(), q.get()))
class Producer(threading.Thread):
def run(self):
global semaphore
while True:
time.sleep(random.random()*5)
print("queue size: {}".format(q.qsize()))
if q.full():
print("current_thread: {}, queue is full!".format(
threading.current_thread()))
else:
t = random.randint(0, 10)
q.put(t)
print("current_thread: {}, produce: {}".format(
threading.current_thread(), t))
semaphore.release()
def run():
product_thread = Producer(name="producer")
consum_thread_list = [
Consumer() for _ in range(4)
]
for t in [product_thread] + consum_thread_list:
t.start()
for t in [product_thread] + consum_thread_list:
t.join()
print("run is over")
if __name__ == "__main__":
run()
在本例中,建立了一個生產者執行緒,四個消費者執行緒。當生產者執行緒向佇列中加入資料時,會通過release增加訊號量的value,而被阻塞的執行緒將按順序依次獲取佇列中的值,同時通過acquire來減少訊號量的value。我在程式碼中將訊號量的初始值設為max_thread,一般是設成0,讓消費者執行緒一開始先掛起,等待生產者執行緒寫入。
current_thread: <Thread(producer, started 139740371240704)>, product: 7
current_thread: <Thread(Thread-1, started 139740362848000)>, consum: 7
current_thread: <Thread(producer, started 139740371240704)>, product: 2
current_thread: <Thread(Thread-2, started 139740354455296)>, consum: 2
current_thread: <Thread(producer, started 139740371240704)>, product: 8
current_thread: <Thread(Thread-3, started 139740346062592)>, consum: 8
current_thread: <Thread(producer, started 139740371240704)>, product: 4
current_thread: <Thread(Thread-4, started 139740337669888)>, consum: 4
但是實際上,並不推薦用訊號量來做生產-消費模型。這是因為訊號量中的value,名義上是最大併發執行緒數,但是它並不真的是一個上界。只要生產的速度遠大於消費(比如把生產執行緒程式碼裡的sleep註釋掉),那麼生產執行緒會不停呼叫release(),因此value是持續增加的。
條件(Condition)
生產-消費模型,生產者會將item放入佇列,而消費者會從佇列中取出item,並且佇列滿時,生產者需要等待,佇列空時,消費者需要等待。因此關鍵點在於 1. 對於佇列的讀寫,每一個生產者/消費者都是互斥的。2. 在1的基礎上當佇列狀態改變時,對應的執行緒還需要阻塞。所以如果只用一個鎖,是無法滿足2的效果。
lock = threading.Lock()
q = Queue(maxsize=10)
class Consumer(threading.Thread):
def run(self):
while True:
time.sleep(random.random())
with lock:
if q.empty():
print("current_thread: {}, queue is empty!".format(
threading.current_thread()))
else:
print("current_thread: {}, consum: {}".format(
threading.current_thread(), q.get()))
class Producer(threading.Thread):
def run(self):
while True:
# time.sleep(random.random()*5)
with lock:
if q.full():
print("current_thread: {}, queue is full!".format(
threading.current_thread()))
else:
t = random.randint(0, 10)
q.put(t)
print("current_thread: {}, produce: {}".format(
threading.current_thread(), t))
def run():
product_thread = Producer(name="producer")
consum_thread_list = [
Consumer() for _ in range(4)
]
for t in [product_thread] + consum_thread_list:
t.start()
for t in [product_thread] + consum_thread_list:
t.join()
print("run is over")
if __name__ == "__main__":
run()
在上例中,由於生產者生產速度極快,因此在佇列填滿後,會不停的申請鎖釋放鎖,而其他消費者執行緒根本無法申請到鎖,因此會一直處於阻塞狀態。那如果反過來,消費者消費速度極快,則會在佇列為空後,不停申請釋放鎖,因此生產者執行緒無法申請鎖,同樣會一直處於阻塞狀態。
那想當然的解決方法,就是針對佇列狀態再加鎖,當佇列狀態為滿時,阻塞生產者,等到某一個消費者消費之後解除生產者的阻塞。而當佇列狀態為空時,阻塞消費者,等到生產者生產之後,解除排在最前面的消費者的阻塞。
那實現這樣的想法,我們可以新增一個等待佇列和一把鎖B,每當某一個執行緒因為資料佇列狀態變化而應當被阻塞時,就讓該執行緒去acquire鎖B兩次,這樣就把它阻塞住了,然後將鎖增加到等待佇列中。在滿足一定條件後,從隊頭取出鎖並release,這樣就解除鎖住鎖B執行緒的阻塞。比如當佇列滿時,通過鎖B把生產者阻塞住,然後把鎖B加入等待佇列。當有消費者從資料佇列裡取出了資料,那麼就從等待佇列裡把鎖B取出並release掉,這樣生產者就可以解除阻塞,重新恢復生產。
而這,就是python中條件鎖的設計思路。
補充
1. 關於python的 += 和 +
python中 += 和 + 的區別只針對於可變物件。對於不可變物件(比如常數),實際上num+=idx 和 num = num + idx 沒有區別,都是加操作生成了新的物件,然後再將 num 的引用指向了它。
a = 257
print(id(a))
a += 2
print(id(a))
a = a + 2
print(id(a))
三次print的結果都是不同。原因是 += 操作本身的確是一個in-place方法,呼叫的是__iadd__內建方法,a+=b等價於 a.__iadd__(b),但是不可變物件是沒有__iadd__的,因此+=在找不到__iadd_方法時會去呼叫__add__方法,所以在操作不可變物件(如常數,元祖等)時,+= 和 + 是等價的。但是操作list,dict等可變物件時,效果就不同了。
a = [1]
print(id(a))
a.__iadd__([2])
print(id(a))
a = a + [2]
print(id(a))
第一,二次print結果是相同的,而第三次print則不同。
所以在多執行緒中,如果操作物件是可變物件,那麼某些情況下加不加鎖,結果都是會穩定輸出的,比如下例求 num 的長度,會穩定打出30001。
num = [0]
lock = threading.Lock()
def func(idx):
global num
for i in range(10000):
if idx > 0:
num += [1]
else:
num += [2, 2]
def run():
t1 = threading.Thread(target=func, args=(1, ))
t2 = threading.Thread(target=func, args=(-1,))
t1.start()
t2.start()
t1.join()
t2.join()
print("run is over! len(num) is ", len(num))
if __name__ == "__main__":
run()
但如果換成 + ,結果會變得不穩定,原因如前文所說,不再贅述。
def func(idx):
global num
for i in range(10000):
if idx > 0:
num = num + [1]
else:
num = num + [2, 2]
def run():
t1 = threading.Thread(target=func, args=(1, ))
t2 = threading.Thread(target=func, args=(-1,))
t1.start()
t2.start()
t1.join()
t2.join()
print("run is over! len(num) is ", len(num))
if __name__ == "__main__":
run()
2.死鎖
死鎖實質上就是每個執行緒需要的資源都被其他執行緒鎖住,而自身又鎖住了其他執行緒需要的資源。舉個例子,假如從一個房間出來需要開兩扇門A和B,你和你的室友手上各有一把鑰匙,現在你握有A門的鑰匙,從A門外進入過道並鎖上了A門,而室友握有B門的鑰匙,他從房間裡出來進入過道並鎖上了B門,由於你們不能交換鑰匙,因此你無法開啟B門進入房間,室友也無法開啟A門離開房間,因此你們只好永遠站在過道里了。
死鎖的形成有四個條件:
- 互斥:即一個資源只能被一個執行緒申請。
- 請求和保持:即當前執行緒已佔有資源,但同時又在申請其他資源
- 不可剝奪:即資源在被某個執行緒主動釋放前無法被其他執行緒申請。
- 環路等待:即所有執行緒都佔有了資源,並且需要其他執行緒佔有的資源才能進行執行。
那麼打破死鎖只要破壞上述任一一個條件即可。