1. 程式人生 > 其它 >最多開啟多少個執行緒_[多執行緒]python threading初窺

最多開啟多少個執行緒_[多執行緒]python threading初窺

技術標籤:最多開啟多少個執行緒

因為工作需要,因此係統學習一下python的多執行緒。

環境:

python3.5.2

正文

當我們每次啟動一個程式時,我們都會啟動一個MainThread,在這個執行緒中程式碼是順序執行的,但是當我們做一些IO密集型的工作,如下載檔案時,如果後續操作暫時不依賴於下載的檔案,那麼期望能夠讓cpu繼續去執行接下來的程式碼,而不是一直等待,直到下載執行完成。因此可以用到多執行緒。

多執行緒的使用流程如下:

  1. 編寫需要線上程中執行的邏輯函式target()
  2. new一個子執行緒,並將target賦值給該子執行緒,target就是想要在子執行緒中執行的函式
  3. 在主執行緒中啟動該子執行緒

樣例:

# coding:utf-8

import threading
import time

def func(i):
    time.sleep(1)
    print("thread: {}, idx:{}".format(threading.current_thread(),i))

def run():
    for i in range(5):
        t = threading.Thread(target=func, args=(i, ))
        t.start()

if __name__ == "__main__":
    run()

在多執行緒的實際使用中,我們的需求並非唯一的。有時候我們希望所有子執行緒執行完之後再去執行主執行緒,有時候我們希望啟動子執行緒後能夠同步執行主執行緒,有時候我們需要順序執行部分子執行緒,再同時跑所有執行緒。因此掌握thread中幾個重要的類函式是必要的。

函式

  1. start()

啟動執行緒,一個thread只能呼叫一次。當使用start()的時候,會在開啟一個新的子執行緒中呼叫該執行緒的run()

2. run()

標準的執行緒run會執行在初始化執行緒時傳入的target函式。而自定義Thread時,需要override這個函式。

start和run的區別在於,只有使用start,才能真正做多執行緒操作。如果在主執行緒中直接對某個執行緒呼叫run(), 那麼實際上相當於在主執行緒呼叫這個子執行緒的run函式,而並沒有啟動該執行緒。

樣例:

# coding:utf-8

import threading
import time


def output(f):
    def wrapper(*args):
        print("{} start!".format(f.__name__))
        f(*args)
        print("{} is over!".format(f.__name__))
    return wrapper

@output
def func(i):
    time.sleep(1)
    print("thread: {}, idx:{}".format(threading.current_thread(),i))


@output
def run():
    for i in range(5):
        t = threading.Thread(target=func, args=(i, ))
        t.run()
        # t.start()
        # t.join()

if __name__ == "__main__":
    run()

如果在迴圈中呼叫t.join(),則主執行緒會開啟五個子執行緒,並且在func中列印當前執行緒的時候,列印的是各個子執行緒。

如果在迴圈中呼叫t.run(),那麼5個thread會依次執行,並且在func中列印當前執行緒的時候,列印的全是主執行緒。

3. join()

用於阻塞主執行緒,直到等待當前子執行緒跑完,才會繼續執行主執行緒。值得注意的是,在迴圈中當先用start()開啟執行緒,然後用join()阻塞了主執行緒,此時run()是在各個子執行緒而不是在主執行緒中執行。儘管在執行上和使用for i ... n;run()的效果一致。

4. is_alive()

當且僅當指定的執行緒在執行中時,返回True。

Lock

在多執行緒中,執行緒間是共享記憶體的,因此當多個執行緒同時對某一個變數進行讀寫時,它們操作的都是同一塊記憶體區域。但是由於排程的問題,無法確保執行緒的執行順序如程式碼的邏輯順序一樣,因此需要通過加鎖的方式來約束執行緒的執行順序。

# coding:utf-8

import threading

num = 0
lock = threading.Lock()

def func(idx):
    global num
    for i in range(1000000):
        # with lock:
            num += idx

def run():
    t1 = threading.Thread(target=func, args=(1, ))    
    t2 = threading.Thread(target=func, args=(-1,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("run is over! num is ", num)
    
if __name__ == "__main__":
    run()

上面的code中,t1 和 t2 分別對全域性變數 num 操作,如果不使用lock,那麼最後 num 的結果不一定是0,因為 num+=idx 實際上是兩條指令,t = num + idx 和 num = t,所以可能當t1執行了 t = num + idx 後,t2正好執行了 num = t,相當於t1的操作無效化了。(這裡其實根據操作的物件屬於可變物件和不可變物件,結果還有差異。文末會在補充1中詳細展開)

為了解決共享記憶體的問題,需要通過鎖的方式來防止多個執行緒同時讀寫同一塊記憶體,這裡可以通過threading.Lock()的方式來處理。用法如上例中註釋的with lock,其等價於在計算前先呼叫了 lock.acquire(),而在計算結束後呼叫了lock.release()。

然而Lock在無形中限制了某一個執行緒多次讀寫同一塊記憶體的能力。如下例:

# coding:utf-8

import threading

num = 0
lock = threading.Lock()

def add(idx):
    global num
    with lock:
        num += idx

def sub(idx):
    global num
    with lock:
        num -= idx

def cal(idx):
    global num
    with lock:
        for i in range(1000):
            add(idx)
        for i in range(1000):
            sub(idx)

def run():
    t1 = threading.Thread(target=cal, args=(2,))    
    t2 = threading.Thread(target=cal, args=(3,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("run is over! num is ", num)
    
if __name__ == "__main__":
    run()

在這種情況下,當某一個執行緒在cal函式中已經取得了鎖,然而在add或sub時需要進一步取鎖時既不能取到鎖,又不能將之前的鎖釋放掉,因此出現了死鎖(死鎖的概念會在文末補充2中介紹)的現象。當然,通過設計好的流程可以規避這一點,但是對於更大更復雜的專案來說,出現這樣的情況難以避免。因此,可以通過RLock(可重入鎖)來放鬆鎖對同一執行緒的限制。

RLock

RLock即允許同一個執行緒反覆申請某一個鎖,但是申請了多少次,就必須要釋放多少次後,鎖住的記憶體才能允許其他執行緒讀寫,除此之外沒有其他區別。

# coding:utf-8

import threading

num = 0
lock = threading.RLock()

def add(idx):
    global num
    with lock:
        num += idx

def sub(idx):
    global num
    with lock:
        num -= idx

def cal(idx):
    global num
    with lock:
        for i in range(1000):
            add(idx)
        for i in range(1000):
            sub(idx)

def run():
    t1 = threading.Thread(target=cal, args=(2,))    
    t2 = threading.Thread(target=cal, args=(3,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("run is over! num is ", num)
    
if __name__ == "__main__":
    run()

Semaphore(訊號量)

Lock or RLock,其目的都是為了防止多個執行緒同時修改同一塊記憶體。但是如果所有執行緒只是去讀同一塊記憶體,那麼Lock/RLock就無用武之地了。在實際場景中,我們可能會遇到需要限制同時讀取資源的執行緒數量的需求,比如爬取資料時,需要限制爬取資料的執行緒的併發數。這裡訊號量則派上了用場。可以通過threading.Semaphore()來例項化一個訊號量,傳入的引數表示最多允許同時讀取的thread數量。在下面程式碼中我們開啟了4個執行緒,而訊號量的設定為3。

# coding:utf-8

import threading
import time

num = 12
semaphore = threading.Semaphore(3)


def consum():
    global num
    with semaphore:
        while num:
            print("current_thread: {}".format(threading.current_thread()))
            num -= 1
            time.sleep(2)


def run():
    thread_list = [
        threading.Thread(target=consum)
        for _ in range(4)
    ]
    for t in thread_list:
        t.start()

    for t in thread_list:
        t.join()

    print("run is over, num: {}".format(num))


if __name__ == "__main__":
    run()

在程式執行時,我們可以看到列印結果是三個三個一出現(不一定是順序的),如下。

current_thread: <Thread(Thread-1, started 140288911263488)>
current_thread: <Thread(Thread-2, started 140288902870784)>
current_thread: <Thread(Thread-3, started 140288894478080)>

這裡訊號量的工作原理較為簡單,訊號量中有一個value,即允許併發的最大執行緒數。每當一個執行緒想要獲取共享資源的時候,首先會檢查當前value的值, 若value <=0 時則會掛起執行緒,等待已佔據共享資源的執行緒釋放。若value>0,則執行緒獲取共享資源,並通過acquire方法對value - 1。而當一個執行緒釋放共享資源時,通過release方法對value + 1。那麼特殊情況就是當value初始值為1,表示同一時間只允許一個執行緒去讀資料,這樣的訊號量又被稱為互斥量。

除了限制併發的執行緒數量,訊號量也可用於控制資料的供取,即常見的生產者-消費者模型。

# coding:utf-8

import threading
import time
import random
from queue import Queue

max_thread = 3
semaphore = threading.Semaphore(max_thread)
q = Queue(maxsize=10)


class Consumer(threading.Thread):

    def run(self):
        while True:
            time.sleep(random.random())
            semaphore.acquire()
            if q.empty():
                print("current_thread: {}, queue is empty!".format(
                    threading.current_thread()))
            else:
                print("current_thread: {}, consum: {}".format(
                    threading.current_thread(), q.get()))


class Producer(threading.Thread):
    def run(self):
        global semaphore
        while True:
            time.sleep(random.random()*5)
            print("queue size: {}".format(q.qsize()))
            if q.full():
                print("current_thread: {}, queue is full!".format(
                    threading.current_thread()))
            else:
                t = random.randint(0, 10)
                q.put(t)
                print("current_thread: {}, produce: {}".format(
                    threading.current_thread(), t))
            semaphore.release()


def run():
    product_thread = Producer(name="producer")
    consum_thread_list = [
        Consumer() for _ in range(4)
    ]
    for t in [product_thread] + consum_thread_list:
        t.start()

    for t in [product_thread] + consum_thread_list:
        t.join()

    print("run is over")


if __name__ == "__main__":
    run()

在本例中,建立了一個生產者執行緒,四個消費者執行緒。當生產者執行緒向佇列中加入資料時,會通過release增加訊號量的value,而被阻塞的執行緒將按順序依次獲取佇列中的值,同時通過acquire來減少訊號量的value。我在程式碼中將訊號量的初始值設為max_thread,一般是設成0,讓消費者執行緒一開始先掛起,等待生產者執行緒寫入。

current_thread: <Thread(producer, started 139740371240704)>, product: 7
current_thread: <Thread(Thread-1, started 139740362848000)>, consum: 7
current_thread: <Thread(producer, started 139740371240704)>, product: 2
current_thread: <Thread(Thread-2, started 139740354455296)>, consum: 2
current_thread: <Thread(producer, started 139740371240704)>, product: 8
current_thread: <Thread(Thread-3, started 139740346062592)>, consum: 8
current_thread: <Thread(producer, started 139740371240704)>, product: 4
current_thread: <Thread(Thread-4, started 139740337669888)>, consum: 4

但是實際上,並不推薦用訊號量來做生產-消費模型。這是因為訊號量中的value,名義上是最大併發執行緒數,但是它並不真的是一個上界。只要生產的速度遠大於消費(比如把生產執行緒程式碼裡的sleep註釋掉),那麼生產執行緒會不停呼叫release(),因此value是持續增加的。

條件(Condition)

生產-消費模型,生產者會將item放入佇列,而消費者會從佇列中取出item,並且佇列滿時,生產者需要等待,佇列空時,消費者需要等待。因此關鍵點在於 1. 對於佇列的讀寫,每一個生產者/消費者都是互斥的。2. 在1的基礎上當佇列狀態改變時,對應的執行緒還需要阻塞。所以如果只用一個鎖,是無法滿足2的效果。

lock = threading.Lock()
q = Queue(maxsize=10)

class Consumer(threading.Thread):

    def run(self):
        while True:
            time.sleep(random.random())
            with lock:
                if q.empty():
                    print("current_thread: {}, queue is empty!".format(
                        threading.current_thread()))
                else:
                    print("current_thread: {}, consum: {}".format(
                        threading.current_thread(), q.get()))


class Producer(threading.Thread):
    def run(self):
        while True:
            # time.sleep(random.random()*5)
            with lock:
                if q.full():
                    print("current_thread: {}, queue is full!".format(
                        threading.current_thread()))
                else:
                    t = random.randint(0, 10)
                    q.put(t)
                    print("current_thread: {}, produce: {}".format(
                        threading.current_thread(), t))


def run():
    product_thread = Producer(name="producer")
    consum_thread_list = [
        Consumer() for _ in range(4)
    ]
    for t in [product_thread] + consum_thread_list:
        t.start()

    for t in [product_thread] + consum_thread_list:
        t.join()

    print("run is over")


if __name__ == "__main__":
    run()

在上例中,由於生產者生產速度極快,因此在佇列填滿後,會不停的申請鎖釋放鎖,而其他消費者執行緒根本無法申請到鎖,因此會一直處於阻塞狀態。那如果反過來,消費者消費速度極快,則會在佇列為空後,不停申請釋放鎖,因此生產者執行緒無法申請鎖,同樣會一直處於阻塞狀態。

那想當然的解決方法,就是針對佇列狀態再加鎖,當佇列狀態為滿時,阻塞生產者,等到某一個消費者消費之後解除生產者的阻塞。而當佇列狀態為空時,阻塞消費者,等到生產者生產之後,解除排在最前面的消費者的阻塞。

那實現這樣的想法,我們可以新增一個等待佇列和一把鎖B,每當某一個執行緒因為資料佇列狀態變化而應當被阻塞時,就讓該執行緒去acquire鎖B兩次,這樣就把它阻塞住了,然後將鎖增加到等待佇列中。在滿足一定條件後,從隊頭取出鎖並release,這樣就解除鎖住鎖B執行緒的阻塞。比如當佇列滿時,通過鎖B把生產者阻塞住,然後把鎖B加入等待佇列。當有消費者從資料佇列裡取出了資料,那麼就從等待佇列裡把鎖B取出並release掉,這樣生產者就可以解除阻塞,重新恢復生產。

而這,就是python中條件鎖的設計思路。


補充

1. 關於python的 += 和 +

python中 += 和 + 的區別只針對於可變物件。對於不可變物件(比如常數),實際上num+=idx 和 num = num + idx 沒有區別,都是加操作生成了新的物件,然後再將 num 的引用指向了它。

a = 257
print(id(a))
a += 2
print(id(a)) 
a = a + 2
print(id(a)) 

三次print的結果都是不同。原因是 += 操作本身的確是一個in-place方法,呼叫的是__iadd__內建方法a+=b等價於 a.__iadd__(b),但是不可變物件是沒有__iadd__的,因此+=在找不到__iadd_方法時會去呼叫__add__方法,所以在操作不可變物件(如常數,元祖等)時,+= 和 + 是等價的。但是操作list,dict等可變物件時,效果就不同了。

a = [1] 
print(id(a))
a.__iadd__([2]) 
print(id(a))
a = a + [2]
print(id(a))

第一,二次print結果是相同的,而第三次print則不同。

所以在多執行緒中,如果操作物件是可變物件,那麼某些情況下加不加鎖,結果都是會穩定輸出的,比如下例求 num 的長度,會穩定打出30001。

num = [0]
lock = threading.Lock()

def func(idx):
    global num
    for i in range(10000):
        if idx > 0:
            num += [1]
        else:
            num += [2, 2]

def run():
    t1 = threading.Thread(target=func, args=(1, ))    
    t2 = threading.Thread(target=func, args=(-1,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("run is over! len(num) is ", len(num))
    
if __name__ == "__main__":
    run()

但如果換成 + ,結果會變得不穩定,原因如前文所說,不再贅述。

def func(idx):
    global num
    for i in range(10000):
        if idx > 0:
            num = num + [1]
        else:
            num = num + [2, 2]

def run():
    t1 = threading.Thread(target=func, args=(1, ))    
    t2 = threading.Thread(target=func, args=(-1,))

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print("run is over! len(num) is ", len(num))
    
if __name__ == "__main__":
    run()

2.死鎖

死鎖實質上就是每個執行緒需要的資源都被其他執行緒鎖住,而自身又鎖住了其他執行緒需要的資源。舉個例子,假如從一個房間出來需要開兩扇門A和B,你和你的室友手上各有一把鑰匙,現在你握有A門的鑰匙,從A門外進入過道並鎖上了A門,而室友握有B門的鑰匙,他從房間裡出來進入過道並鎖上了B門,由於你們不能交換鑰匙,因此你無法開啟B門進入房間,室友也無法開啟A門離開房間,因此你們只好永遠站在過道里了。

死鎖的形成有四個條件:

  1. 互斥:即一個資源只能被一個執行緒申請。
  2. 請求和保持:即當前執行緒已佔有資源,但同時又在申請其他資源
  3. 不可剝奪:即資源在被某個執行緒主動釋放前無法被其他執行緒申請。
  4. 環路等待:即所有執行緒都佔有了資源,並且需要其他執行緒佔有的資源才能進行執行。

那麼打破死鎖只要破壞上述任一一個條件即可。