1. 程式人生 > >20190103(GIL,池,阻塞,同步非同步)

20190103(GIL,池,阻塞,同步非同步)

GIL鎖

什麼是GIL

GIL全域性直譯器鎖,是防止多個執行緒在同一時間同時執行的、CPython直譯器特有的一種互斥鎖。 每一個py檔案都會有自己的直譯器,也就是說不同py檔案的GIL都是獨立的, ps:分散於不同程序內的執行緒不會去爭搶同一把GIL,只有同一個程序的多個執行緒才爭搶同一把GIL。

為什麼需要GIL

CPython直譯器的記憶體管理機制是非執行緒安全的,所以通過GIL使程式序列,可以保證資料安全,但會降低執行效率。 非執行緒安全屬於歷史遺留問題,而一旦修改,原來基於GIL的程式都要修改。

GIL帶來的問題

即便多核處理器,也無法真正並行(可以併發),程序內執行緒只能併發,同一時刻只有一個執行緒可以執行

GIL的加鎖和解鎖時機(待補充)

加鎖的時機:在呼叫直譯器時立即加鎖

解鎖時機:

  • 當前執行緒遇到了IO時釋放

  • 當前執行緒執行時間超過設定值時釋放,直譯器會檢測執行緒的執行時間,一旦到達某個閾值,通知執行緒儲存狀態切換執行緒,以此來保證資料安全

多程序與多執行緒的使用情況分析:

1、單核時,無論是IO密集還是計算密集,GIL都沒有影響; 2、多核下,IO密集型會受到GIL的影響,但是影響不大,此時優先考慮多執行緒處理資料; 3、多核下,計算密集型受到GIL影響較大,此時使用多程序可以提高效率,因為CPython多執行緒無法並行,但多程序可以。

程式執行時間對比:

# 純計算,大量,程序更快(如果只是少量計算,執行緒未必慢,因為開啟程序比開啟執行緒更費時間)
from threading import  Thread
from multiprocessing import  Process
import time


a = 1
def task():
   global a
   for i in range(10000000):
       a +=1
       a * 10 / 2 - 3


# 多執行緒
s = time.time()
t1 = Thread(target=task)
t2 = Thread(target=task)
t3 = Thread(target=task)
t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()
print(time.time()-s)

# 多程序
if __name__ == '__main__':
   s = time.time()
   t1 = Process(target=task)
   t2 = Process(target=task)
   t3 = Process(target=task)
   t1.start()
   t2.start()
   t3.start()

   t1.join()
   t2.join()
   t3.join()

   print(time.time()-s)

 

GIL與自定義互斥鎖的異同

GIL作用於全域性,自定義互斥鎖作用於區域性 執行緒必須先獲取GIL才能去拿自定義互斥鎖 單程序內的所有執行緒都會去搶GIL 單程序內的只有一部分執行緒會去搶自定義的互斥鎖

程序池與執行緒池

是一種容器,本質是儲存程序或執行緒的列表 IO密集使用執行緒池,計算密集使用程序池 執行緒/程序池不僅幫我們控制執行緒/程序的數量,還幫我們完成了執行緒/程序的建立,銷燬,以及任務的分配

池與訊號量的區別:

訊號量的作用是同一時間某一個數據能被指定數量的程序或執行緒訪問 而池控制的是數量,沒有對資料訪問進行限制

程序池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os

# 建立程序池,指定最大程序數為3,此時不會建立程序,不指定數量時,預設為CPU核數
pool = ProcessPoolExecutor(3)

def task():
   time.sleep(1)
   print(os.getpid(),"working..")

if __name__ == '__main__':
   for i in range(10):
       pool.submit(task) # 提交任務時立即建立程序

   # 任務執行完成後也不會立即銷燬程序
   time.sleep(2)

   for i in range(10):
       pool.submit(task) #再有新任務時 直接使用之前已經建立好的程序來執行

輸出結果:(每次可能都不同)
13760 working..
26536 working..
21388 working..
13760 working..
26536 working..
21388 working..
26536 working..
13760 working..
21388 working..
26536 working..
13760 working..
21388 working..
13760 working..
26536 working..
21388 working..
13760 working..
26536 working..
21388 working..
13760 working..
26536 working..

執行緒池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread,active_count
import time,os

# 建立程序池,指定最大執行緒數為3,此時不會建立執行緒,不指定數量時,預設為CPU和核數*5
pool = ThreadPoolExecutor(3)
print(active_count()) # 只有一個主線

def task():
   time.sleep(1)
   print(current_thread().name,"working..")


for i in range(10):
   pool.submit(task) # 第一次提交任務時立即建立執行緒
# 任務執行完成後也不會立即銷燬
time.sleep(2)

for i in range(10):
   pool.submit(task) #再有新任務是 直接使用之前已經建立好的執行緒來執行

阻塞vs非阻塞

阻塞、就緒、執行指的是程序的三種執行狀態。 阻塞=》遇到io操作,無法繼續執行,並且立刻釋放CPU資源 非阻塞=》正常執行中(就緒、執行),沒有遇到IO操作,或者通過某種手段讓程式即便遇到IO也不會停在原地,而是執行其他操作。

總結:阻塞與非阻塞指的是程式的執行狀態

同步與非同步

同步與非同步:指的是提交任務的兩種方式

同步(呼叫/執行/任務/提交):提交完任務後就在原地等待,拿到返回值後才能繼續執行下一行程式碼(可能是IO,也可能是因為計算時間較長等原因)。 非同步(呼叫/執行/任務/提交):提交完任務後不在原地等待,不關心任務的結果,直接執行下一行程式碼。

非同步效率高於同步。

非同步提交任務時,任務可能包含IO操作,非同步也可能阻塞。 同步提交任務時,也可能會因為計算複雜而卡,不等同與阻塞。

 

程式中的非同步呼叫並獲取結果:

from concurrent.futures import ThreadPoolExecutor  # 匯入執行緒池
from threading import current_thread  # 從執行緒中匯入檢視當前執行緒的方法
import time

pool = ThreadPoolExecutor(3)


def task(i):
   time.sleep(0.01)
   print(current_thread().name,"working..")
   return i ** i
objs = []
for i in range(1,5):
   res_obj = pool.submit(task,i) # 非同步方式提交任務,會返回一個物件用於表示任務結果,即pool.submit()會生成物件
   objs.append(res_obj)

pool.shutdown(wait=True)  # shutdown是一個阻塞函式,指的是不能再往程序池內提交任務,wait=True等待程序池或執行緒池內所有的任務都執行完畢,才允許程式碼往下繼續執行。

# 從結果物件中取出執行結果
for res_obj in objs:  # 從列表中拿出物件
   print(res_obj.result()) # 由submit得到的物件會有result()方法,即物件內封裝的返回值
print("over")

 

程式中的同步呼叫並獲取結果:(等到返回結果才能繼續執行)

from  concurrent.futures import ThreadPoolExecutor
import time,random

def task(i):
   print("%s run"%i)
   time.sleep(random.randint(1,3))
   return i**2  # 返回值,這也是result方法的要求。
pool = ThreadPoolExecutor(3)
for i in range(6):
   p = pool.submit(task,i)
   res = p.result()
   print(res)
print("over")
輸出結果:
0 run
0
1 run
1
2 run
4
3 run
9
4 run
16
5 run
25
over