1. 程式人生 > >Python GIL(Global Interpreter Lock)

Python GIL(Global Interpreter Lock)

proc bsp rep 串行 return test 的人 multiple mutex

一,介紹

定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

  結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢

  

    首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時
所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成
可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣
一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython
就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念
裏CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。

    所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

二,GIL介紹

  GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

  可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。

  要想了解GIL,首先確定一點:每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不同的python進程

驗證python test.py 只會產生一個進程

‘‘‘
#驗證python test.py只會產生一個進程
#test.py內容
import os,time
print(os.getpid())
time.sleep(1000)
‘‘‘
python3 test.py 
#在windows下
tasklist |findstr python
#在linux下
ps aux |grep python

  在一個python的進程內,不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內,毫無疑問

    1 所有數據都是共享的,這其中,代碼作為一種數據也是被所有線程共享的
(test.py的所有代碼以及Cpython解釋器的所有代碼)

    例如:test.py定義一個函數work(代碼內容如下圖),在進程內所有線程都能訪問到work的代碼,
於是我們可以開啟三個線程然後target都指向該代碼,能訪問到意味著就是可以執行。

    2 所有線程的任務,都需要將任務的代碼當做參數傳給解釋器的代碼去執行,即所有
的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。

綜上:

  如果多個線程的target=work,那麽執行流程是

  多個線程先訪問到解釋器的代碼,即拿到執行權限,然後將target的代碼交給解釋器的代碼去執行

  釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什麽高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼

技術分享圖片

三,GIL與Lock

  機智的同學可能會問到這個問題:Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什麽這裏還需要lock?

  首先,我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據

  然後,我們可以得出結論:保護不同的數據就應該加不同的鎖。

  最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

  GIL保護的是解釋器級別的數據,保護用戶自己的數據則需要自己加鎖處理,如下圖:

技術分享圖片

分析:

    1、100個線程去搶GIL鎖,即搶執行權限

    2、肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()

    3、極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但
線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL

    4、直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥
鎖lock,然後其他的線程再重復2 3 4的過程

  

代碼示例:

# _*_ coding: utf-8 _*_ 
from threading import Thread
from threading import Lock
import time

n =100
def task():
    global n
    mutex.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    mutex.release()

if __name__ == ‘__main__‘:
    mutex = Lock()
    t_l = []
    for i in range(100):
        t = Thread(target=task)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    print("主",n)

  結果:肯定為0,由原來的並發執行變為串行,犧牲了執行效率保證了數據安全,不加鎖則結果可能為99

主 0

  

四,GIL與多線程

  有了GIL的存在,同一時刻同一進程中只有一個線程被執行

  聽到這裏,有的同學立馬質問:進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢,也就是說python沒用了?

所以說 要解決這個問題,我們需要在幾個點上達成一致:

    1. cpu到底是用來做計算的,還是用來做I/O的?

    2. 多cpu,意味著可以有多個核並行完成計算,所以多核提升的是計算性能

    3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什麽用處 

  

  一個工人相當於cpu,此時計算相當於工人在幹活,I/O阻塞相當於為工人幹活提供所需原材料的過程,工人幹活的過程中如果沒有原材料了,則工人幹活的過程需要停止,直到等待原材料的到來。

  如果你的工廠幹的大多數任務都要有準備原材料的過程(I/O密集型),那麽你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去幹別的活,

  反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高

結論:

  對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用

  當然對運行一個程序來說,隨著cpu的增多執行效率肯定會有所提高(不管
提高幅度多大,總會有所提高),這是因為一個程序基本上不會是純計算或者
純I/O,所以我們只能相對的去看一個程序到底是計算密集型還是I/O密集型,
從而進一步分析python的多線程到底有無用武之地

假設我們有四個任務需要處理,處理方式肯定是需要玩出並發的效果,解決方案可以是:

    方案一:開啟四個進程

    方案二:一個進程下,開啟四個線程

單核情況下,分析結果:

    如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷。方案二勝
    
    如果四個任務是I/O密集型,方案一創建進程的開銷大,且金成德切換速度遠不如線程,方案二勝

多核情況下,分析結果:

    如果四個任務是計算密集型,多核意味著並行計算,在python中一個進程中同一時刻只有一個線程執行,並不上多核。方案一勝
    
    如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

結論:

    現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來
多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯
著提升的。

  

五,多線程性能測試

如果並發的多個任務是計算密集型:多進程效率高

# _*_ coding: utf-8 _*_
#計算密集型用多進程
from multiprocessing import Process
from threading import Thread
import os
import time

def work():
    res = 0
    for i in range(100000000):
        res *= 1

if  __name__ == ‘__main__‘:
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(8):
        # p = Process(target=work)
        #run time is :43.401108741760254
        t = Thread(target=work)
        #run time is : 62.395447731018066
        # l.append(p)
        # p.start()
        l.append(t)
        t.start()
    for t in l:
        t.join()
    # for p in l:
    #     p.join()
    stop = time.time()
    print(‘run time is :‘,(stop-start))

  

如果並發的多個任務是I/O密集型:多線程效率高

#IO密集型用多線程
from multiprocessing import Process
from threading import Thread
import os
import time

def work():
    time.sleep(0.5)

if  __name__ == ‘__main__‘:
    l = []
    print(os.cpu_count())
    start = time.time()
    for i in range(400):
        # p = Process(target=work)  #run time is : 39.320624113082886
        p = Thread(target=work)      #run time is : 0.5927295684814453
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop = time.time()
    print(‘run time is :‘,(stop-start))

  

應用:

    多線程用於IO密集型,如socket 爬蟲 ,web

    多進程用於計算密集型,如金融分析

  

六,死鎖現象

  所謂死鎖就是指兩個或者兩個以上的進程或者線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,他們都將無法推進下去,此時稱系統處於死鎖狀況或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print(‘\033[41m%s 拿到A鎖\033[0m‘ %self.name)

        mutexB.acquire()
        print(‘\033[42m%s 拿到B鎖\033[0m‘ %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(‘\033[43m%s 拿到B鎖\033[0m‘ %self.name)
        time.sleep(2)

        mutexA.acquire()
        print(‘\033[44m%s 拿到A鎖\033[0m‘ %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t=MyThread()
        t.start()

  執行效果

Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖




 #出現死鎖,整個程序阻塞住

七,遞歸鎖

  死鎖的解決方法是是使用遞歸鎖,遞歸鎖,是在python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock

  這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次。

from threading import Thread,RLock
import time

mutexA=mutexB=RLock()
 #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,
#這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print(‘\033[41m%s 拿到A鎖\033[0m‘ %self.name)

        mutexB.acquire()
        print(‘\033[42m%s 拿到B鎖\033[0m‘ %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print(‘\033[43m%s 拿到B鎖\033[0m‘ %self.name)
        time.sleep(2)

        mutexA.acquire()
        print(‘\033[44m%s 拿到A鎖\033[0m‘ %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t=MyThread()
        t.start()

  結果:

Thread-1 拿到了A鎖
Thread-1 拿到了B鎖
Thread-1 拿到了B鎖
Thread-1 拿到了A鎖
Thread-2 拿到了A鎖
Thread-2 拿到了B鎖
Thread-2 拿到了B鎖
Thread-2 拿到了A鎖
Thread-4 拿到了A鎖
Thread-4 拿到了B鎖
Thread-4 拿到了B鎖
Thread-4 拿到了A鎖
Thread-6 拿到了A鎖
Thread-6 拿到了B鎖
Thread-6 拿到了B鎖
Thread-6 拿到了A鎖
Thread-8 拿到了A鎖
Thread-8 拿到了B鎖
Thread-8 拿到了B鎖
Thread-8 拿到了A鎖
Thread-10 拿到了A鎖
Thread-10 拿到了B鎖
Thread-10 拿到了B鎖
Thread-10 拿到了A鎖
Thread-5 拿到了A鎖
Thread-5 拿到了B鎖
Thread-5 拿到了B鎖
Thread-5 拿到了A鎖
Thread-9 拿到了A鎖
Thread-9 拿到了B鎖
Thread-9 拿到了B鎖
Thread-9 拿到了A鎖
Thread-7 拿到了A鎖
Thread-7 拿到了B鎖
Thread-7 拿到了B鎖
Thread-7 拿到了A鎖
Thread-3 拿到了A鎖
Thread-3 拿到了B鎖
Thread-3 拿到了B鎖
Thread-3 拿到了A鎖

八,信號量

  信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那麽信號量就相當於一群路人爭搶公共廁所,公共廁所有多個坑位,這意味著同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是信號量的大小

from threading import Thread,Semaphore
import threading
import time

def func():
    sm.acquire()
    print(‘%s get sm‘ %threading.current_thread().getName())
    time.sleep(3)
    sm.release()

if __name__ == ‘__main__‘:
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

  解析:

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

  與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

九,Event

  同進程的一樣

  線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。

技術分享圖片

技術分享圖片

  例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那麽我們就可以采用threading.Event機制來協調各個工作線程的連接操作

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError(‘鏈接超時‘)
        print(‘<%s>第%s次嘗試鏈接‘ % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print(‘<%s>鏈接成功‘ %threading.current_thread().getName())


def check_mysql():
    print(‘\033[45m[%s]正在檢查mysql\033[0m‘ % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == ‘__main__‘:
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

十,條件Condition(了解)

  使線程等待,只有滿足了某條件時,才能釋放n個線程。

import threading
 
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" %n)
    con.release()
 
if __name__ == ‘__main__‘:
 
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()
 
    while True:
        inp = input(‘>>>‘)
        if inp == ‘q‘:
            break
        con.acquire()
        con.notify(int(inp))
        con.release()

def condition_func():

    ret = False
    inp = input(‘>>>‘)
    if inp == ‘1‘:
        ret = True

    return ret


def run(n):
    con.acquire()
    con.wait_for(condition_func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == ‘__main__‘:

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

  

十一,定時器

  定時器指定n秒後執行某操作,比如定時炸彈

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

  驗證碼定時器

from threading import Timer
import random,time

class Code:
    def __init__(self):
        self.make_cache()

    def make_cache(self,interval=5):
        self.cache=self.make_code()
        print(self.cache)
        self.t=Timer(interval,self.make_cache)
        self.t.start()

    def make_code(self,n=4):
        res=‘‘
        for i in range(n):
            s1=str(random.randint(0,9))
            s2=chr(random.randint(65,90))
            res+=random.choice([s1,s2])
        return res

    def check(self):
        while True:
            inp=input(‘>>: ‘).strip()
            if inp.upper() ==  self.cache:
                print(‘驗證成功‘,end=‘\n‘)
                self.t.cancel()
                break


if __name__ == ‘__main__‘:
    obj=Code()
    obj.check()

驗證碼定時器

  

Python GIL(Global Interpreter Lock)