1. 程式人生 > 其它 >python 多執行緒講解(如何實現多執行緒,遞迴鎖,互斥鎖,訊號量,事件等)

python 多執行緒講解(如何實現多執行緒,遞迴鎖,互斥鎖,訊號量,事件等)

技術標籤:爬蟲python多執行緒

什麼是執行緒

執行緒(Thread)也叫輕量級程序,是作業系統能夠進行運算排程的最小單位,它被包涵在程序之中,是程序中的實際運作單位。執行緒自己不擁有系統資源,只擁有一點兒在執行中必不可少的資源,但它可與同屬一個程序的其它執行緒共享程序所擁有的全部資源。一個執行緒可以建立和撤消另一個執行緒,同一程序中的多個執行緒之間可以併發執行。

為什麼使用多執行緒

執行緒在程式中是獨立的、併發的執行流。與分隔的程序相比,程序中執行緒之間的隔離程度要小,它們共享記憶體、檔案控制代碼和其他程序應有的狀態。
因為執行緒的劃分尺度小於程序,使得多執行緒程式的併發性高。程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體,從而極大地提高了程式的執行效率。

執行緒比程序具有更高的效能,這是由於同一個程序中的執行緒都有共性多個執行緒共享同一個程序的虛擬空間。執行緒共享的環境包括程序程式碼段、程序的公有資料等,利用這些共享的資料,執行緒之間很容易實現通訊。
作業系統在建立程序時,必須為該程序分配獨立的記憶體空間,並分配大量的相關資源,但建立執行緒則簡單得多。因此,使用多執行緒來實現併發比使用多程序的效能要高得多。
總結起來,使用多執行緒程式設計具有如下幾個優點:

  1. 程序之間不能共享記憶體,但執行緒之間共享記憶體非常容易
  2. 作業系統在建立程序時,需要為該程序重新分配系統資源,但建立執行緒的代價則小得多。因此,使用多執行緒來實現多工併發執行比使用多程序的效率高
  3. Python 語言內建了多執行緒功能支援,而不是單純地作為底層作業系統的排程方式,從而簡化了 Python 的多執行緒程式設計

執行緒實現

threading模組
普通建立

import threading

def run(n,i):
    print("我是執行緒", n,i)


if __name__ == '__main__':
    for i in range(10):
        t1 = threading.Thread(target=run, args=("A",i))
        t2 = threading.Thread(
target=run, args=("B",i)) t1.start() t2.start()

傳遞引數的方法:

使用args 傳遞引數 threading.Thread(target=fun, args=(10, 100, 100))
使用kwargs傳遞引數 threading.Thread(target=fun, kwargs={“a”: 10, “b”:100, “c”: 100})
同時使用 args 和 kwargs 傳遞引數 threading.Thread(target=fun, args=(10, ), kwargs={“b”: 100,“c”: 100})

自定義執行緒

import threading

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重構run函式必須要寫
        self.n = n

    def run(self):
        print("我是執行緒", self.n)


if __name__ == "__main__":
    for i in range(10):
        t1 = MyThread("t1")
        t2 = MyThread("t2")
        t1.start()
        t2.start()

守護執行緒
使用setDaemon(True)把所有的子執行緒都變成了主執行緒的守護執行緒,因此當主程序結束後,子執行緒也會隨之結束。所以當主執行緒結束後,整個程式就退出了。

import threading
import time


def run(n):
    print("task", n)
    time.sleep(1)  # 此時子執行緒停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')


if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)  # 把子程序設定為守護執行緒,必須在start()之前設定
    t.start()
    print("end")

程式碼中
t和print(“end”)是不同的執行緒,print(“end”)是主執行緒

主執行緒等待子執行緒結束
為了讓守護執行緒執行結束之後,主執行緒再結束,我們可以使用join方法,讓主執行緒等待子執行緒執行。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子執行緒停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   #把子程序設定為守護執行緒,必須在start()之前設定
    t.start()
    t.join() # 設定主執行緒等待子執行緒結束
    print("end")

多執行緒共享全域性變數
執行緒是程序的執行單元,程序是系統分配資源的最小單位,所以在同一個程序中的多執行緒是共享資源的。

import threading
import time

g_num = 100

def work1():
    global g_num
    for i in range(3):
        g_num += 1
    print("in work1 g_num is : %d" % g_num)

def work2():
    #global g_num
    print("in work2 g_num is : %d" % g_num)

if __name__ == '__main__':
    t1 = threading.Thread(target=work1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=work2)
    t2.start()

互斥鎖
由於執行緒之間是進行隨機排程,並且每個執行緒可能只執行n條執行之後,當多個執行緒同時修改同一條資料時可能會出現髒資料,所以,出現了執行緒鎖,即同一時刻允許一個執行緒執行操作。執行緒鎖用於鎖定資源,你可以定義多個鎖, 像下面的程式碼, 當你需要獨佔某一資源時,任何一個鎖都可以鎖這個資源,就好比你用不同的鎖都可以把相同的一個門鎖住是一個道理。

由於執行緒之間是進行隨機排程,如果有多個執行緒同時操作一個物件,如果沒有很好地保護該物件,會造成程式結果的不可預期,我們也稱此為“執行緒不安全”。
為了方式上面情況的發生,就出現了互斥鎖(Lock)

如果不使用鎖

import time, threading

# 假定這是你的銀行卡餘額:
balance = 0

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(1000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

定義了一個共享變數balance,初始值為0,並且啟動兩個執行緒,先存後取,理論上結果應該為0,但是,由於執行緒的排程是由作業系統決定的,當t1、t2交替執行時,只要迴圈次數足夠多,balance的結果就不一定是0了。

原因是因為高階語言的一條語句在CPU執行時是若干條語句,即使一個簡單的計算:

balance = balance + n

也分兩步:

x = balance + n
balance = x

兩條語句被執行多次就說不定不是0啦
究其原因,是因為修改balance需要多條語句,而執行這幾條語句時,執行緒可能中斷,從而導致多個執行緒把同一個物件的內容改亂了。

兩個執行緒同時一存一取,就可能導致餘額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數,所以,必須確保一個執行緒在修改balance的時候,別的執行緒一定不能改。
如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個執行緒開始執行change_it()時,我們說,該執行緒因為獲得了鎖,因此其他執行緒不能同時執行change_it(),只能等待,直到鎖被釋放後,獲得該鎖以後才能改。由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以,不會造成修改的衝突。建立一個鎖就是通過threading.Lock()來實現:

import time, threading

# 假定這是你的銀行卡餘額:
balance = 0

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n


lock = threading.Lock()
def run_thread(n):
    for i in range(1000000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

遞迴鎖
RLcok類的用法和Lock類一模一樣,但它支援巢狀,在多個鎖沒有釋放的時候一般會使用RLcok類。

import threading
import time

def Func(lock):
    global gl_num
    lock.acquire()
    gl_num += 1
    time.sleep(1)
    print(gl_num)
    lock.release()

if __name__ == '__main__':
    gl_num = 0
    lock = threading.RLock()
    for i in range(10):
        t = threading.Thread(target=Func, args=(lock,))
        t.start()

訊號量(BoundedSemaphore類)
互斥鎖同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。

import time

def run(n, semaphore):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放

if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個執行緒同時執行
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

事件(Event類)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件是一個簡單的執行緒同步物件,其主要提供以下幾個方法:

clear 將flag設定為“False”
set 將flag設定為“True”
is_set 判斷是否設定了flag
wait 會一直監聽flag,如果沒有檢測到flag就一直處於阻塞狀態
事件處理的機制:全域性定義了一個“Flag”,當flag值為“False”,那麼event.wait()就會阻塞,當flag值為“True”,那麼event.wait()便不再阻塞。

#利用Event類模擬紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除標誌位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設定標誌位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設定了標誌位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()

在這裡插入圖片描述
本文主要轉述來源
https://www.cnblogs.com/luyuze95/p/11289143.html