Python GIL(Global Interpreter Lock)
一,介紹
定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢
首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時 所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成 可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣 一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython 就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念 裏CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。 所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL
二,GIL介紹
GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。
可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。
要想了解GIL,首先確定一點:每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不同的python進程
驗證python test.py 只會產生一個進程
‘‘‘ #驗證python test.py只會產生一個進程 #test.py內容 import os,time print(os.getpid()) time.sleep(1000) ‘‘‘ python3 test.py #在windows下 tasklist |findstr python #在linux下 ps aux |grep python
在一個python的進程內,不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內,毫無疑問
1 所有數據都是共享的,這其中,代碼作為一種數據也是被所有線程共享的 (test.py的所有代碼以及Cpython解釋器的所有代碼) 例如:test.py定義一個函數work(代碼內容如下圖),在進程內所有線程都能訪問到work的代碼, 於是我們可以開啟三個線程然後target都指向該代碼,能訪問到意味著就是可以執行。 2 所有線程的任務,都需要將任務的代碼當做參數傳給解釋器的代碼去執行,即所有 的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。
綜上:
如果多個線程的target=work,那麽執行流程是
多個線程先訪問到解釋器的代碼,即拿到執行權限,然後將target的代碼交給解釋器的代碼去執行
解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什麽高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼
三,GIL與Lock
機智的同學可能會問到這個問題:Python已經有一個GIL來保證同一時間只能有一個線程來執行了,為什麽這裏還需要lock?
首先,我們需要達成共識:鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據
然後,我們可以得出結論:保護不同的數據就應該加不同的鎖。
最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
GIL保護的是解釋器級別的數據,保護用戶自己的數據則需要自己加鎖處理,如下圖:
分析:
1、100個線程去搶GIL鎖,即搶執行權限 2、肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire() 3、極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但 線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL 4、直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥 鎖lock,然後其他的線程再重復2 3 4的過程
代碼示例:
# _*_ coding: utf-8 _*_ from threading import Thread from threading import Lock import time n =100 def task(): global n mutex.acquire() temp = n time.sleep(0.1) n = temp - 1 mutex.release() if __name__ == ‘__main__‘: mutex = Lock() t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start() for t in t_l: t.join() print("主",n)
結果:肯定為0,由原來的並發執行變為串行,犧牲了執行效率保證了數據安全,不加鎖則結果可能為99
主 0
四,GIL與多線程
有了GIL的存在,同一時刻同一進程中只有一個線程被執行
聽到這裏,有的同學立馬質問:進程可以利用多核,但是開銷大,而python的多線程開銷小,但卻無法利用多核優勢,也就是說python沒用了?
所以說 要解決這個問題,我們需要在幾個點上達成一致:
1. cpu到底是用來做計算的,還是用來做I/O的? 2. 多cpu,意味著可以有多個核並行完成計算,所以多核提升的是計算性能 3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什麽用處
一個工人相當於cpu,此時計算相當於工人在幹活,I/O阻塞相當於為工人幹活提供所需原材料的過程,工人幹活的過程中如果沒有原材料了,則工人幹活的過程需要停止,直到等待原材料的到來。
如果你的工廠幹的大多數任務都要有準備原材料的過程(I/O密集型),那麽你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去幹別的活,
反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高
結論:
對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用 當然對運行一個程序來說,隨著cpu的增多執行效率肯定會有所提高(不管 提高幅度多大,總會有所提高),這是因為一個程序基本上不會是純計算或者 純I/O,所以我們只能相對的去看一個程序到底是計算密集型還是I/O密集型, 從而進一步分析python的多線程到底有無用武之地
假設我們有四個任務需要處理,處理方式肯定是需要玩出並發的效果,解決方案可以是:
方案一:開啟四個進程 方案二:一個進程下,開啟四個線程
單核情況下,分析結果:
如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷。方案二勝 如果四個任務是I/O密集型,方案一創建進程的開銷大,且金成德切換速度遠不如線程,方案二勝
多核情況下,分析結果:
如果四個任務是計算密集型,多核意味著並行計算,在python中一個進程中同一時刻只有一個線程執行,並不上多核。方案一勝 如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝
結論:
現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來 多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯 著提升的。
五,多線程性能測試
如果並發的多個任務是計算密集型:多進程效率高
# _*_ coding: utf-8 _*_ #計算密集型用多進程 from multiprocessing import Process from threading import Thread import os import time def work(): res = 0 for i in range(100000000): res *= 1 if __name__ == ‘__main__‘: l = [] print(os.cpu_count()) start = time.time() for i in range(8): # p = Process(target=work) #run time is :43.401108741760254 t = Thread(target=work) #run time is : 62.395447731018066 # l.append(p) # p.start() l.append(t) t.start() for t in l: t.join() # for p in l: # p.join() stop = time.time() print(‘run time is :‘,(stop-start))
如果並發的多個任務是I/O密集型:多線程效率高
#IO密集型用多線程 from multiprocessing import Process from threading import Thread import os import time def work(): time.sleep(0.5) if __name__ == ‘__main__‘: l = [] print(os.cpu_count()) start = time.time() for i in range(400): # p = Process(target=work) #run time is : 39.320624113082886 p = Thread(target=work) #run time is : 0.5927295684814453 l.append(p) p.start() for p in l: p.join() stop = time.time() print(‘run time is :‘,(stop-start))
應用:
多線程用於IO密集型,如socket 爬蟲 ,web 多進程用於計算密集型,如金融分析
六,死鎖現象
所謂死鎖就是指兩個或者兩個以上的進程或者線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,他們都將無法推進下去,此時稱系統處於死鎖狀況或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(‘\033[41m%s 拿到A鎖\033[0m‘ %self.name) mutexB.acquire() print(‘\033[42m%s 拿到B鎖\033[0m‘ %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print(‘\033[43m%s 拿到B鎖\033[0m‘ %self.name) time.sleep(2) mutexA.acquire() print(‘\033[44m%s 拿到A鎖\033[0m‘ %self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t=MyThread() t.start()
執行效果
Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 #出現死鎖,整個程序阻塞住
七,遞歸鎖
死鎖的解決方法是是使用遞歸鎖,遞歸鎖,是在python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次。
from threading import Thread,RLock import time mutexA=mutexB=RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1, #這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止 class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(‘\033[41m%s 拿到A鎖\033[0m‘ %self.name) mutexB.acquire() print(‘\033[42m%s 拿到B鎖\033[0m‘ %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print(‘\033[43m%s 拿到B鎖\033[0m‘ %self.name) time.sleep(2) mutexA.acquire() print(‘\033[44m%s 拿到A鎖\033[0m‘ %self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t=MyThread() t.start()
結果:
Thread-1 拿到了A鎖 Thread-1 拿到了B鎖 Thread-1 拿到了B鎖 Thread-1 拿到了A鎖 Thread-2 拿到了A鎖 Thread-2 拿到了B鎖 Thread-2 拿到了B鎖 Thread-2 拿到了A鎖 Thread-4 拿到了A鎖 Thread-4 拿到了B鎖 Thread-4 拿到了B鎖 Thread-4 拿到了A鎖 Thread-6 拿到了A鎖 Thread-6 拿到了B鎖 Thread-6 拿到了B鎖 Thread-6 拿到了A鎖 Thread-8 拿到了A鎖 Thread-8 拿到了B鎖 Thread-8 拿到了B鎖 Thread-8 拿到了A鎖 Thread-10 拿到了A鎖 Thread-10 拿到了B鎖 Thread-10 拿到了B鎖 Thread-10 拿到了A鎖 Thread-5 拿到了A鎖 Thread-5 拿到了B鎖 Thread-5 拿到了B鎖 Thread-5 拿到了A鎖 Thread-9 拿到了A鎖 Thread-9 拿到了B鎖 Thread-9 拿到了B鎖 Thread-9 拿到了A鎖 Thread-7 拿到了A鎖 Thread-7 拿到了B鎖 Thread-7 拿到了B鎖 Thread-7 拿到了A鎖 Thread-3 拿到了A鎖 Thread-3 拿到了B鎖 Thread-3 拿到了B鎖 Thread-3 拿到了A鎖
八,信號量
信號量也是一把鎖,可以指定信號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那麽信號量就相當於一群路人爭搶公共廁所,公共廁所有多個坑位,這意味著同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是信號量的大小
from threading import Thread,Semaphore import threading import time def func(): sm.acquire() print(‘%s get sm‘ %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == ‘__main__‘: sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
解析:
Semaphore管理一個內置的計數器, 每當調用acquire()時內置計數器-1; 調用release() 時內置計數器+1; 計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
九,Event
同進程的一樣
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
例如,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那麽我們就可以采用threading.Event機制來協調各個工作線程的連接操作
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError(‘鏈接超時‘) print(‘<%s>第%s次嘗試鏈接‘ % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1 print(‘<%s>鏈接成功‘ %threading.current_thread().getName()) def check_mysql(): print(‘\033[45m[%s]正在檢查mysql\033[0m‘ % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == ‘__main__‘: event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
十,條件Condition(了解)
使線程等待,只有滿足了某條件時,才能釋放n個線程。
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == ‘__main__‘: con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input(‘>>>‘) if inp == ‘q‘: break con.acquire() con.notify(int(inp)) con.release()
def condition_func(): ret = False inp = input(‘>>>‘) if inp == ‘1‘: ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == ‘__main__‘: con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
十一,定時器
定時器指定n秒後執行某操作,比如定時炸彈
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
驗證碼定時器
from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res=‘‘ for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input(‘>>: ‘).strip() if inp.upper() == self.cache: print(‘驗證成功‘,end=‘\n‘) self.t.cancel() break if __name__ == ‘__main__‘: obj=Code() obj.check() 驗證碼定時器
Python GIL(Global Interpreter Lock)