Python_多執行緒threading模組
python 在執行的時候會淡定的在CPU上只允許一個執行緒執行,故Python在多核CPU的情況下也只能發揮出單核的功能,其中的原因:gil鎖
gil 鎖 (全域性直譯器鎖):每個執行緒在執行時都需要先獲取gil 一個執行緒執行Python,而其他N個睡眠或者等待I/O(即 保證同一時刻只有一個執行緒丟共享資源進行存取)
多執行緒兩種呼叫方式:
import threading
import time
class Oh(threading.Thread): # 繼承
# 多執行緒繼承式呼叫
def __init__(self,num):
threading.Thread.__init__(self) # (經典類寫法)繼承父類構造方法,否則會覆蓋父類
self.num = num
def run(self): # 定義每個執行緒要執行的函式
print('我是一個數字:%s' % self.num)
time.sleep(3)
if __name__ == '__main__':
O1 = Oh(1)
O2 = Oh(2)
O1.start()
O2.start()
繼承式呼叫
import threading
import time
def Yes(num): # 定義要執行的函式
# 多執行緒直接式呼叫(常用)
print('列印了一個數:%s'% num)
time.sleep(3) # 執行完等三秒
if __name__ =='__main__':
y1 = threading.Thread(target=Yes, args=[1,]) # 例項化建立了一個執行緒
y2 = threading.Thread(target=Yes, args=[2,]) # 第二個執行緒
y1.start() # 執行緒開始
y2.start()
print(y1.getName()) # 列印執行緒的名字
print(y2.getName())
y1.join() # 主函式等待y1執行緒執行完過後再執行主執行緒
y2.join() # 執行緒為並行,全部執行完才一起等待 3 秒
print('我是最後被列印的東東。。') # 這是主執行緒執行的最後的東東
直接式呼叫
一、多執行緒方法
threading.enumerate() |
返回當前執行中活著的執行緒物件列表 |
threading.active_count() |
返回當前處於alive狀態的執行緒物件個數(包含主執行緒),等於enumerate的列表長度 |
threading.current_thread() |
返回當前的執行緒物件,對應於呼叫者控制的執行緒 |
threading.get_ident() |
返回當前程序的‘執行緒識別符號’ |
threading.main_thread() |
返回主執行緒物件 |
threading.stack_size() |
返回當建立一個新執行緒使用的執行緒棧大小,0 為預設配置 |
threading.TIMEOUT_MAX |
設定threading全域性超時時間 |
二、多執行緒類:
Thread | 一個執行執行緒的物件 |
Lock | 鎖物件 |
RLock | 可重入鎖物件,使單一執行緒(再次)獲得已持有的鎖物件(遞迴鎖) |
Condition | 條件變數物件,使得一個執行緒等待另外一個執行緒滿足特定的條件,比如改變狀態或者某個資料值 |
Event | 條件變數的通用版本,任意數量的執行緒等待某個事件的發生,在該事件發生後所有的執行緒都將被啟用 |
Semaphore | 為執行緒間的有限資源提供一個計數器,如果沒有可用資源時會被阻塞 |
BoundedSemaphore | 於Semaphore相似,不過它不允許超過初始值 |
Timer | 於Thread類似,不過它要在執行前等待一定時間 |
Barrier | 建立一個障礙,必須達到指定數量的執行緒後才可以繼續 |
1.Thread類
Thread(group = None,target = None,name = None , args = (), kwargs = {})
- group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
- target: 要執行的方法;
- name: 執行緒名;
- args/kwargs: 要傳入方法的引數
類中方法:
start() | 啟動執行緒,呼叫start(),run() |
run() | 定義執行緒的方法,經常被重寫 |
join([timeout]) | 阻塞到執行緒結束或到timeout值 |
getName() | 獲取執行緒名 |
setName() | 設定執行緒名 |
is_alive() | 返回執行緒是否正在執行 |
isDaemon() | 是否是守護執行緒(已棄用) |
setDaemon() | 設定為守護執行緒,預設為Flase |
1>.start () & run()
- start(): 啟動一個子執行緒,呼叫start()和run()
- run(): 只調用run()
2> 守護執行緒.setDaemon()
- 預設為False,執行緒在前臺執行,主執行緒執行過過程中,執行緒也在前臺執行,主執行緒執行完畢後,等待執行緒執行完成,主執行緒再停止執行
- 設定為True後,執行緒在後臺執行,主執行緒執行過程中,執行緒也在後臺執行,主執行緒執行完畢後,無論執行緒成功與否,完成與否均停止執行
例子:
1.迴圈等待最後一個執行緒(對join的操作)
import threading
import time
# 迴圈等待最後一個執行緒
def Vera(num):
print('我是一個數:%s'% num)
time.sleep(2)
t_list = []
if __name__ == '__main__':
for i in range(10):
v = threading.Thread(target=Vera,args=[i,])
v.start()
# v.join() # 執行緒序列 # 主執行緒等待子執行緒 v 執行完
t_list.append(v) # 並行的執行緒,所有的都執行完
for i in t_list: # 所有的執行緒都執行join
i.join()
print('\n我完了完了完了。。')
example 1
2.守護執行緒和join中timeout的設定
import threading
import time
def run(n):
print('第 【%s】個程序' % n)
time.sleep(3)
print('等待 【%s】秒後' % n)
join_list =[]
def main():
for i in range(5):
t = threading.Thread(target=run ,args=[i,])
t.start()
# t.join() # 設定單執行緒走
print('開始執行緒',t.getName())
join_list.append(t)
for n in join_list: # 設定每一個執行緒都等待完(多執行緒(一起完,然後再執行sleep中的秒數))
n.join()
m = threading.Thread(target=main,args =[])
m.setDaemon(True) # 守護執行緒
m.start()
m.join(timeout=2) # (子執行緒設為執行緒執行完全時)主執行緒被設為守護執行緒後 等待2秒後完
# (執行緒可以不執行完全時(即子執行緒內沒有設定join))主執行緒被設為守護執行緒後,執行緒完全執行完但不等待sleep中的時間
print('主執行緒完了。。')
example 2
2.Lock & RLock類
gil是控制同一時刻在底層執行,是鎖直譯器級別以下的鎖,只管鎖底層而不管原生執行緒自己的記憶體資料間是否互斥,Lock則是加直譯器以上的執行緒間的互斥,但是Python3.X中,貌似加了一層鎖,但官方沒有做相關解釋,但為了保險起見,我們還是最好要加一層鎖。
lock(指令鎖): 全域性
RLock(可重入鎖(遞迴鎖)):執行緒
方法:
- acquire([timeout]):嘗試獲得鎖定,使執行緒進入同步阻塞狀態
- release():釋放鎖,使用前執行緒必須獲得鎖定,否則丟擲異常
import threading
import time
def run1(): # 小鎖一號
print('我是run 1')
lock.acquire()
global num
num +=1
lock.release()
return num
def run2(): # 小鎖二號
print('我是run 2')
lock.acquire()
global num2
num2 +=1
lock.release()
return num2
def run3(): # 大鎖 鎖住了一號和二號
print('我是run 3')
lock.acquire()
res = run1() # 確保run 1 和run 2 中間沒有其他的執行
print('run 1 和 run 2')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock() # 每一個鎖可以各自釋放
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() !=1: # 當前還有幾個執行緒
print(threading.active_count())
else:
print('所有的執行完了。。')
print(num, num2)
遞迴鎖
Lock & RLok(對比)
import threading
lock = threading.Lock() #Lock物件
lock.acquire()
lock.acquire()
print('yes')
lock.release()
lock.release()
print(lock.acquire())
# 結果
# 發生死鎖,無線迴圈
Lock
import threading
rLock = threading.RLock() #RLock物件
rLock.acquire()
rLock.acquire()
print('yes')
rLock.release()
rLock.release()
print(rlock.acquire())
# 結果
yes
True
#在同一執行緒內,程式不會堵塞。
RLock
3.Semaphore(訊號量)&BoundSemaphore
互斥鎖:同時只允許一個執行緒更改資料
Semaphore: 同時允許多個執行緒更改資料
Semaphore 在內部管理這一個計數器,呼叫 .acquire()時 ,計數器 -1 ,呼叫 .release()時,計數器 -1,而當計數器等於0時,acquire()則阻塞,直到其他執行緒來呼叫release()
BoundSemaphore 有界訊號量會確保它當前值不超它的初始值,如果超過則丟擲valueError異常
方法:
- acquire():嘗試獲得鎖定,使執行緒進入同步阻塞狀態
- release():釋放鎖
import threading,time
def run(n):
se.acquire()
time.sleep(1)
print('執行執行緒:%s' % n)
se.release() # 釋放,訊號量 + 1
# se.release() # 再次釋放,訊號量 +1
# 當指定為.Semaphore()時,多次的訊號量 +1不會丟擲異常
# 當指定為.BoundeSemaphore()時,多次的訊號量 +1 會丟擲 ValueError 異常
if __name__ == '__main__':
num = 0
se = threading.Semaphore(3)
# se = threading.BoundedSemaphore(3)
for i in range(20):
t = threading.Thread(target=run,args=[i,])
t.start()
print('主執行緒over..')
print(num)
訊號量
4.Event類
一個執行緒通知一個事件,另一個執行緒等待通知並作出處理
方法:
- isSet(): 當內部標誌為True則返回True,否則返回False
- wait([timeout]): 不斷檢測set()是否阻塞,或者直到timeout超時
- set(): 設定內部標誌為True,所有等待的執行緒都被喚醒
- clear(): 重新設定內部標誌為False,呼叫wait()不斷對set()檢測直到set()被呼叫
import threading,time
def light():
if not event.isSet():
event.set() # 設定為Ture
s = 0 # 假裝這是設定的計時秒數
while True:
if s < 6:
print('假裝我是綠燈。。')
elif s < 8:
print('假裝我是黃燈。。')
elif s < 12:
if event.isSet():
event.clear() # 紅燈 所有車(執行緒)等待
print('假裝我是紅燈。。')
else:
s = 0 # 秒數
event.set() # 重置為 綠燈
time.sleep(2)
s += 1 # 假設的秒數 加一
def car(n):
while True:
time.sleep(1) # 每次的多個執行緒要一秒
if event.isSet():
print('車牌號【%s】,我過去啦。。啦。' % n)
else:
print('車牌號【%s】,我被塞住了。*_*。。。' % n)
event.wait() # 不斷檢測set()有沒有被置為True
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target= light)
Light.start()
for i in range(5): # 有 5 個車車在跑
t = threading.Thread(target=car,args=[i, ])
t.start()
車車過紅綠燈的故事
別人寫的就是好(一份導圖): https://blog.csdn.net/fz420/article/details/78958745