1. 程式人生 > 實用技巧 >P1613 跑路

P1613 跑路

執行緒介紹

什麼是執行緒

執行緒(Thread)也叫輕量級程序,是作業系統能夠進行運算排程的最小單位,它被包涵在程序之中,是程序中的實際運作單位。執行緒自己不擁有系統資源,只擁有一點兒在執行中必不可少的資源,但它可與同屬一個程序的其它執行緒共享程序所擁有的全部資源。一個執行緒可以建立和撤消另一個執行緒,同一程序中的多個執行緒之間可以併發執行。

為什麼要使用多執行緒

執行緒在程式中是獨立的、併發的執行流。與分隔的程序相比,程序中執行緒之間的隔離程度要小,它們共享記憶體、檔案控制代碼和其他程序應有的狀態。

因為執行緒的劃分尺度小於程序,使得多執行緒程式的併發性高。程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體,從而極大地提高了程式的執行效率。

執行緒比程序具有更高的效能,這是由於同一個程序中的執行緒都有共性多個執行緒共享同一個程序的虛擬空間。執行緒共享的環境包括程序程式碼段、程序的公有資料等,利用這些共享的資料,執行緒之間很容易實現通訊。

作業系統在建立程序時,必須為該程序分配獨立的記憶體空間,並分配大量的相關資源,但建立執行緒則簡單得多。因此,使用多執行緒來實現併發比使用多程序的效能要高得多。

總結起來,使用多執行緒程式設計具有如下幾個優點:

  • 程序之間不能共享記憶體,但執行緒之間共享記憶體非常容易。

  • 作業系統在建立程序時,需要為該程序重新分配系統資源,但建立執行緒的代價則小得多。因此,使用多執行緒來實現多工併發執行比使用多程序的效率高。

  • Python 語言內建了多執行緒功能支援,而不是單純地作為底層作業系統的排程方式,從而簡化了 Python 的多執行緒程式設計。

單執行緒

我想做菜,那麼其中的洗菜,炒菜等動作是先後進行的。

from time import ctime,sleep

def wash():
    for i in range(2):
        print "I was washing vegetables. %s" %ctime()
        sleep(1)

def cook():
    for i in range(2):
        print "I was cooking vegetables. %s
" %ctime() sleep(5) if __name__ == '__main__': wash() cook() print "all over %s" %ctime()

$ I was washing vegetables. Tue Dec  1 17:52:37 2020
$ I was washing vegetables. Tue Dec  1 17:52:38 2020
$ I was cooking vegetables. Tue Dec  1 17:52:39 2020
$ I was cooking vegetables. Tue Dec  1 17:52:44 2020
$ all over Tue Dec 1 17:52:49 2020

先洗菜,通過for迴圈來控制洗了兩次,每次需要1秒鐘,sleep()來控制洗菜的時長。接著我們開始炒菜,

每個菜需要5秒鐘,這裡炒了兩個。在整過程結束後,我通過

print "all over %s" %ctime()

看了一下當前時間,執行結果如上。

其實上面的例項就是單執行緒,每件事必須先後的執行,不能同時執行。

多執行緒

在我們生活中,存在很多,我們可以吃著東西看著電影,騎著車,聽著音樂,這種並行的動作,

Python3 執行緒中常用的兩個模組為:

  • _thread
  • threading(推薦使用)

thread 模組已被廢棄。使用者可以使用 threading 模組代替。所以,在 Python3 中不能再使用"thread" 模組。為了相容性,Python3 將 thread 重新命名為 "_thread"。

threading 模組除了包含 _thread 模組中的所有方法外,還提供的其他方法:

  • threading.currentThread(): 返回當前的執行緒變數。
  • threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  • threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,執行緒模組同樣提供了Thread類來處理執行緒,Thread類提供了以下方法:

  • run():用以表示執行緒活動的方法。
  • start():啟動執行緒活動。
  • join([time]):等待至執行緒中止。這阻塞呼叫執行緒直至執行緒的join() 方法被呼叫中止-正常退出或者丟擲未處理的異常-或者是可選的超時發生。
  • isAlive():返回執行緒是否活動的。
  • getName():返回執行緒名。
  • setName():設定執行緒名。

所以這裡我們直接學習threading

threading模組

普通建立方式

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)

if __name__ == '__main__':
    t1 = threading.Thread(target=run, args=("t1",))
    t2 = threading.Thread(target=run, args=("t2",))
    t1.start()
    t2.start()

----------------------------------

>>> task t1
>>> task t2
>>> 2s
>>> 2s
>>> 1s
>>> 1s
>>> 0s
>>> 0s

自定義執行緒

繼承threading.Thread來自定義執行緒類,其本質是重構Thread類中的run方法

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重構run函式必須要寫
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)

if __name__ == "__main__":
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t1.start()
    t2.start()
    
----------------------------------

>>> task t1
>>> task t2
>>> 2s
>>> 2s
>>> 1s
>>> 1s
>>> 0s
>>> 0s

守護執行緒

我們看下面這個例子,這裡使用setDaemon(True)把所有的子執行緒都變成了主執行緒的守護執行緒,因此當主程序結束後,子執行緒也會隨之結束。所以當主執行緒結束後,整個程式就退出了。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子執行緒停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   #把子程序設定為守護執行緒,必須在start()之前設定
    t.start()
    print("end")
    
----------------------------------

>>> task t1
>>> end

我們可以發現,設定守護執行緒之後,當主執行緒結束時,子執行緒也將立即結束,不再執行。

如果不設定守護執行緒

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子執行緒停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    # t.setDaemon(True)   #把子程序設定為守護執行緒,必須在start()之前設定
    t.start()
    print("end")

>>>  taskend
>>>   t1
>>>  3
>>>  2
>>>  1

主執行緒等待子執行緒結束

為了讓守護執行緒執行結束之後,主執行緒再結束,我們可以使用join方法,讓主執行緒等待子執行緒執行。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子執行緒停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

if __name__ == '__main__':
    t = threading.Thread(target=run, args=("t1",))
    t.setDaemon(True)   #把子程序設定為守護執行緒,必須在start()之前設定
    t.start()
    t.join() # 設定主執行緒等待子執行緒結束
    print("end")

----------------------------------

>>> task t1
>>> 3
>>> 2
>>> 1
>>> end

我們只對上面的程式加了個join()方法,用於等待執行緒終止。join()的作用是,在子執行緒完成執行之前,這個子執行緒的父執行緒將一直被阻塞。

多執行緒共享全域性變數

執行緒是程序的執行單元,程序是系統分配資源的最小單位,所以在同一個程序中的多執行緒是共享資源的。

  

import threading
import time

g_num = 100

def work1():
    global g_num
    for i in range(3):
        g_num += 1
    print("in work1 g_num is : %d" % g_num)

def work2():
    global g_num
    print("in work2 g_num is : %d" % g_num)

if __name__ == '__main__':
    t1 = threading.Thread(target=work1)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=work2)
    t2.start()

----------------------------------

>>> in work1 g_num is : 103
>>> in work2 g_num is : 103

互斥鎖

由於執行緒之間是進行隨機排程,並且每個執行緒可能只執行n條執行之後,當多個執行緒同時修改同一條資料時可能會出現髒資料,所以,出現了執行緒鎖,即同一時刻允許一個執行緒執行操作。執行緒鎖用於鎖定資源,你可以定義多個鎖, 像下面的程式碼, 當你需要獨佔某一資源時,任何一個鎖都可以鎖這個資源,就好比你用不同的鎖都可以把相同的一個門鎖住是一個道理。

由於執行緒之間是進行隨機排程,如果有多個執行緒同時操作一個物件,如果沒有很好地保護該物件,會造成程式結果的不可預期,我們也稱此為“執行緒不安全”。

為了方式上面情況的發生,就出現了互斥鎖(Lock)

import threading
import time

# 定義一個全域性變數
g_num = 0
def test1(num):
    global g_num

    for i in range(num):

        mutex.acquire()  # 上鎖 注意了此時鎖的程式碼越少越好
        g_num += 1
        mutex.release()  # 解鎖
        
    print("-----in test1 g_num=%d----" % g_num)


def test2(num):
    global g_num
    for i in range(num):
        mutex.acquire()  # 上鎖
        g_num += 1
        mutex.release()  # 解鎖
    print("-----in test2 g_num=%d=----" % g_num)


# 建立一個互斥鎖,預設是沒有上鎖的
mutex = threading.Lock()


def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))

    t1.start()
    t2.start()

    # 等待上面的2個執行緒執行完畢....
    time.sleep(2)

    print("-----in main Thread g_num = %d---" % g_num)

if __name__ == "__main__":
    main()

死鎖:線上程間共享多個資源的時候,如果兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖

在使用鎖的時候要注意不要寫出死鎖程式碼,附死鎖程式碼參考,總結一句就是互相持有對方執行緒所需要的鎖,造成死鎖

import threading

a = 100


def func1():
    global a
    for i in range(1000000):
        meta_A.acquire()  # 上鎖
        meta_B.acquire()  # 上多把鎖 產生了死鎖 看下面程式碼
        print('-------------1')
        a += 1
        meta_B.release()
        meta_A.release()  # 釋放鎖
    print(a)


def func2():
    global a
    for i in range(1000000):
        meta_B.acquire()
        meta_A.acquire()
        print('------------2')
        a += 1
        meta_A.release()
        meta_B.release()
    print(a)


# 建立鎖
meta_A = threading.Lock()
meta_B = threading.Lock()


t1 = threading.Thread(target=func1) 
t2 = threading.Thread(target=func2)
t1.start()
t2.start()

RLcok類的用法和Lock類一模一樣,但它支援巢狀,在多個鎖沒有釋放的時候一般會使用RLcok類。

RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。

import threading

lock = threading.RLock()
num = 0


def sum1():
    lock.acquire()
    global num
    num += 1
    print("from sum1")
    lock.release()
    return num


def sum2():
    lock.acquire()
    res = sum1()
    lock.release()
    print(res)


for i in range(10):
    t = threading.Thread(target=sum2)
    t.start()

訊號量(BoundedSemaphore類)

互斥鎖同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。

import threading
import time

def run(n, semaphore):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放

if __name__ == '__main__':
    num = 0
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個執行緒同時執行
    for i in range(22):
        t = threading.Thread(target=run, args=("t-%s" % i, semaphore))
        t.start()
    while threading.active_count() != 1:
        pass  # print threading.active_count()
    else:
        print('-----all threads done-----')

事件(Event類)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件是一個簡單的執行緒同步物件,其主要提供以下幾個方法:

  • clear 將flag設定為“False”
  • set 將flag設定為“True”
  • is_set 判斷是否設定了flag
  • wait 會一直監聽flag,如果沒有檢測到flag就一直處於阻塞狀態

事件處理的機制:全域性定義了一個“Flag”,當flag值為“False”,那麼event.wait()就會阻塞,當flag值為“True”,那麼event.wait()便不再阻塞。

#利用Event類模擬紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除標誌位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設定標誌位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設定了標誌位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()

GIL(Global Interpreter Lock)全域性直譯器鎖

在非python環境中,單核情況下,同時只能有一個任務執行。多核時可以支援多個執行緒同時執行。但是在python中,無論有多少核,同時只能執行一個執行緒。究其原因,這就是由於GIL的存在導致的。

GIL的全稱是Global Interpreter Lock(全域性直譯器鎖),來源是python設計之初的考慮,為了資料安全所做的決定。某個執行緒想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,並且在一個python程序中,GIL只有一個。拿不到通行證的執行緒,就不允許進入CPU執行。GIL只在cpython中才有,因為cpython呼叫的是c語言的原生執行緒,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個執行緒拿到資料。而在pypy和jpython中是沒有GIL的。

Python多執行緒的工作過程:
python在使用多執行緒的時候,呼叫的是c語言的原生執行緒。

  • 拿到公共資料
  • 申請gil
  • python直譯器呼叫os原生執行緒
  • os操作cpu執行運算
  • 當該執行緒執行時間到後,無論運算是否已經執行完,gil都被要求釋放
  • 進而由其他程序重複上面的過程
  • 等其他程序執行完後,又會切換到之前的執行緒(從他記錄的上下文繼續執行),整個過程是每個執行緒執行自己的運算,當執行時間到就進行切換(context switch)。

python針對不同型別的程式碼執行效率也是不同的:

1、CPU密集型程式碼(各種迴圈處理、計算等等),在這種情況下,由於計算工作多,ticks計數很快就會達到閾值,然後觸發GIL的釋放與再競爭(多個執行緒來回切換當然是需要消耗資源的),所以python下的多執行緒對CPU密集型程式碼並不友好。
2、IO密集型程式碼(檔案處理、網路爬蟲等涉及檔案讀寫的操作),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多執行緒能線上程A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率)。所以python的多執行緒對IO密集型程式碼比較友好。

使用建議?

python下想要充分利用多核CPU,就用多程序。因為每個程序有各自獨立的GIL,互不干擾,這樣就可以真正意義上的並行執行,在python中,多程序的執行效率優於多執行緒(僅僅針對多核CPU而言)。

GIL在python中的版本差異:

1、在python2.x裡,GIL的釋放邏輯是當前執行緒遇見IO操作或者ticks計數達到100時進行釋放。(ticks可以看作是python自身的一個計數器,專門做用於GIL,每次釋放後歸零,這個計數可以通過sys.setcheckinterval 來調整)。而每次釋放GIL鎖,執行緒進行鎖競爭、切換執行緒,會消耗資源。並且由於GIL鎖存在,python裡一個程序永遠只能同時執行一個執行緒(拿到GIL的執行緒才能執行),這就是為什麼在多核CPU上,python的多執行緒效率並不高。
2、在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到閾值後,當前執行緒釋放GIL),這樣對CPU密集型程式更加友好,但依然沒有解決GIL導致的同一時間只能執行一個執行緒的問題,所以效率依然不盡如人意。