1. 程式人生 > >Python_多執行緒threading模組

Python_多執行緒threading模組

 

python 在執行的時候會淡定的在CPU上只允許一個執行緒執行,故Python在多核CPU的情況下也只能發揮出單核的功能,其中的原因:gil鎖

 

gil 鎖 (全域性直譯器鎖):每個執行緒在執行時都需要先獲取gil 一個執行緒執行Python,而其他N個睡眠或者等待I/O(即 保證同一時刻只有一個執行緒丟共享資源進行存取)

 

多執行緒兩種呼叫方式:

import threading
import time

class Oh(threading.Thread):     # 繼承
    # 多執行緒繼承式呼叫
    def
__init__(self,num): threading.Thread.__init__(self) # (經典類寫法)繼承父類構造方法,否則會覆蓋父類 self.num = num def run(self): # 定義每個執行緒要執行的函式 print('我是一個數字:%s' % self.num) time.sleep(3) if __name__ == '__main__': O1 = Oh(1) O2 = Oh(2) O1.start() O2.start()
繼承式呼叫
import threading
import time

def Yes(num):     # 定義要執行的函式
    # 多執行緒直接式呼叫(常用) 
    print('列印了一個數:%s'% num)
    time.sleep(3)                # 執行完等三秒

if __name__ =='__main__':

    y1 = threading.Thread(target=Yes, args=[1,])    # 例項化建立了一個執行緒
    y2 = threading.Thread(target=Yes, args=[2,])    #
第二個執行緒 y1.start() # 執行緒開始 y2.start() print(y1.getName()) # 列印執行緒的名字 print(y2.getName()) y1.join() # 主函式等待y1執行緒執行完過後再執行主執行緒 y2.join() # 執行緒為並行,全部執行完才一起等待 3 秒 print('我是最後被列印的東東。。') # 這是主執行緒執行的最後的東東
直接式呼叫

一、多執行緒方法

threading.enumerate()
返回當前執行中活著的執行緒物件列表
threading.active_count()
返回當前處於alive狀態的執行緒物件個數(包含主執行緒),等於enumerate的列表長度
threading.current_thread() 
返回當前的執行緒物件,對應於呼叫者控制的執行緒
threading.get_ident() 
返回當前程序的‘執行緒識別符號’
threading.main_thread()
返回主執行緒物件
threading.stack_size()  
返回當建立一個新執行緒使用的執行緒棧大小,0 為預設配置
threading.TIMEOUT_MAX 
設定threading全域性超時時間
二、多執行緒類:
Thread 一個執行執行緒的物件
Lock 鎖物件
RLock 可重入鎖物件,使單一執行緒(再次)獲得已持有的鎖物件(遞迴鎖)
Condition 條件變數物件,使得一個執行緒等待另外一個執行緒滿足特定的條件,比如改變狀態或者某個資料值
Event 條件變數的通用版本,任意數量的執行緒等待某個事件的發生,在該事件發生後所有的執行緒都將被啟用       
Semaphore 為執行緒間的有限資源提供一個計數器,如果沒有可用資源時會被阻塞
BoundedSemaphore 於Semaphore相似,不過它不允許超過初始值
Timer 於Thread類似,不過它要在執行前等待一定時間
Barrier 建立一個障礙,必須達到指定數量的執行緒後才可以繼續

 

 

1.Thread類


Thread(group = None,target = None,name = None , args = (), kwargs = {})

  • group: 執行緒組,目前還沒有實現,庫引用中提示必須是None; 
  • target: 要執行的方法; 
  • name: 執行緒名; 
  • args/kwargs: 要傳入方法的引數

類中方法:

start() 啟動執行緒,呼叫start(),run()
run() 定義執行緒的方法,經常被重寫
join([timeout]) 阻塞到執行緒結束或到timeout值                      
getName() 獲取執行緒名
setName() 設定執行緒名
is_alive() 返回執行緒是否正在執行
isDaemon() 是否是守護執行緒(已棄用)
setDaemon() 設定為守護執行緒,預設為Flase
 

1>.start ()  & run()

  • start(): 啟動一個子執行緒,呼叫start()和run()
  • run(): 只調用run()

2> 守護執行緒.setDaemon()

  • 預設為False,執行緒在前臺執行,主執行緒執行過過程中,執行緒也在前臺執行,主執行緒執行完畢後,等待執行緒執行完成,主執行緒再停止執行
  • 設定為True後,執行緒在後臺執行,主執行緒執行過程中,執行緒也在後臺執行,主執行緒執行完畢後,無論執行緒成功與否,完成與否均停止執行

例子:

1.迴圈等待最後一個執行緒(對join的操作)

import threading
import time

# 迴圈等待最後一個執行緒

def Vera(num):

    print('我是一個數:%s'% num)
    time.sleep(2)

t_list = []
if __name__ == '__main__':
    for i in range(10):
        v = threading.Thread(target=Vera,args=[i,])
        v.start()
        # v.join()    # 執行緒序列 # 主執行緒等待子執行緒 v 執行完

        t_list.append(v)         # 並行的執行緒,所有的都執行完
    for i in t_list:    # 所有的執行緒都執行join
        i.join()
    print('\n我完了完了完了。。')
example 1

2.守護執行緒和join中timeout的設定

import threading
import time

def run(n):
    print('第 【%s】個程序' % n)
    time.sleep(3)
    print('等待 【%s】秒後' % n)
join_list =[]
def main():
    for i in range(5):
        t = threading.Thread(target=run ,args=[i,])
        t.start()
        # t.join()    # 設定單執行緒走
        print('開始執行緒',t.getName())
        join_list.append(t)
    for n in join_list:    # 設定每一個執行緒都等待完(多執行緒(一起完,然後再執行sleep中的秒數))
        n.join()

m = threading.Thread(target=main,args =[])
m.setDaemon(True)    # 守護執行緒
m.start()
m.join(timeout=2)    # (子執行緒設為執行緒執行完全時)主執行緒被設為守護執行緒後 等待2秒後完
# (執行緒可以不執行完全時(即子執行緒內沒有設定join))主執行緒被設為守護執行緒後,執行緒完全執行完但不等待sleep中的時間
print('主執行緒完了。。')
example 2

 

2.Lock & RLock類


gil是控制同一時刻在底層執行,是鎖直譯器級別以下的鎖,只管鎖底層而不管原生執行緒自己的記憶體資料間是否互斥,Lock則是加直譯器以上的執行緒間的互斥,但是Python3.X中,貌似加了一層鎖,但官方沒有做相關解釋,但為了保險起見,我們還是最好要加一層鎖。

lock(指令鎖): 全域性

RLock(可重入鎖(遞迴鎖)):執行緒

方法:

  • acquire([timeout]):嘗試獲得鎖定,使執行緒進入同步阻塞狀態
  • release():釋放鎖,使用前執行緒必須獲得鎖定,否則丟擲異常
指令鎖
import threading
import time 

def run1():   # 小鎖一號
    print('我是run 1')
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():           # 小鎖二號
    print('我是run 2')
    lock.acquire()
    global num2
    num2 +=1
    lock.release()
    return num2
def run3():         # 大鎖 鎖住了一號和二號
    print('我是run 3')
    lock.acquire()
    res = run1()       # 確保run 1 和run 2 中間沒有其他的執行
    print('run 1 和 run 2')
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == '__main__':

    num, num2 = 0, 0
    lock = threading.RLock()     # 每一個鎖可以各自釋放
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() !=1:   # 當前還有幾個執行緒
    print(threading.active_count())
else:
    print('所有的執行完了。。')
    print(num, num2)
遞迴鎖

Lock & RLok(對比)

import threading
lock = threading.Lock() #Lock物件
lock.acquire()
lock.acquire()  
print('yes')
lock.release()
lock.release()
print(lock.acquire())



# 結果

# 發生死鎖,無線迴圈
Lock
import threading
rLock = threading.RLock()      #RLock物件
rLock.acquire()
rLock.acquire() 
print('yes')
rLock.release()
rLock.release()
print(rlock.acquire())


# 結果

yes
True

#在同一執行緒內,程式不會堵塞。
RLock

 

3.Semaphore(訊號量)&BoundSemaphore


互斥鎖:同時只允許一個執行緒更改資料

Semaphore: 同時允許多個執行緒更改資料

Semaphore 在內部管理這一個計數器,呼叫 .acquire()時 ,計數器 -1 ,呼叫 .release()時,計數器 -1,而當計數器等於0時,acquire()則阻塞,直到其他執行緒來呼叫release()

BoundSemaphore 有界訊號量會確保它當前值不超它的初始值,如果超過則丟擲valueError異常

方法:

  • acquire():嘗試獲得鎖定,使執行緒進入同步阻塞狀態
  • release():釋放鎖
import threading,time

def run(n):
    se.acquire()
    time.sleep(1)
    print('執行執行緒:%s' % n)
    se.release()       # 釋放,訊號量 + 1
    # se.release()    # 再次釋放,訊號量 +1
    # 當指定為.Semaphore()時,多次的訊號量 +1不會丟擲異常
    # 當指定為.BoundeSemaphore()時,多次的訊號量 +1 會丟擲 ValueError 異常

if __name__ == '__main__':
    num = 0

    se = threading.Semaphore(3)
    # se = threading.BoundedSemaphore(3)
    for i in range(20):
        t = threading.Thread(target=run,args=[i,])
        t.start()

    print('主執行緒over..')
    print(num)
訊號量

 

4.Event類


一個執行緒通知一個事件,另一個執行緒等待通知並作出處理

方法:

  • isSet(): 當內部標誌為True則返回True,否則返回False
  • wait([timeout]): 不斷檢測set()是否阻塞,或者直到timeout超時
  • set(): 設定內部標誌為True,所有等待的執行緒都被喚醒
  • clear(): 重新設定內部標誌為False,呼叫wait()不斷對set()檢測直到set()被呼叫
import  threading,time

def light():
    if not event.isSet():
        event.set()        # 設定為Ture

    s = 0         # 假裝這是設定的計時秒數
    while True:
        if s < 6:
            print('假裝我是綠燈。。')

        elif s < 8:
            print('假裝我是黃燈。。')

        elif s < 12:
            if event.isSet():
                event.clear()            # 紅燈 所有車(執行緒)等待
            print('假裝我是紅燈。。')
        else:
            s = 0         # 秒數
            event.set()  # 重置為 綠燈

        time.sleep(2)
        s += 1        # 假設的秒數  加一


def car(n):
    while True:
        time.sleep(1)          # 每次的多個執行緒要一秒
        if event.isSet():
            print('車牌號【%s】,我過去啦。。啦。' % n)
        else:
            print('車牌號【%s】,我被塞住了。*_*。。。' % n)
            event.wait()    # 不斷檢測set()有沒有被置為True


if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target= light)
    Light.start()
    for i in range(5):     # 有 5 個車車在跑
        t = threading.Thread(target=car,args=[i, ])
        t.start()
車車過紅綠燈的故事


別人寫的就是好(一份導圖): https://blog.csdn.net/fz420/article/details/78958745