1. 程式人生 > >Python_多線程threading模塊

Python_多線程threading模塊

list src 計數 通用版本 args play ide 信號量 後臺運行

python 在執行的時候會淡定的在CPU上只允許一個線程運行,故Python在多核CPU的情況下也只能發揮出單核的功能,其中的原因:gil鎖

gil 鎖 (全局解釋器鎖):每個線程在執行時都需要先獲取gil 一個線程運行Python,而其他N個睡眠或者等待I/O(即 保證同一時刻只有一個線程丟共享資源進行存取)

多線程兩種調用方式:

技術分享圖片
import threading
import time

class Oh(threading.Thread):     # 繼承
    # 多線程繼承式調用
    def __init__(self,num):
        threading.Thread.
__init__(self) # (經典類寫法)繼承父類構造方法,否則會覆蓋父類 self.num = num def run(self): # 定義每個線程要運行的函數 print(我是一個數字:%s % self.num) time.sleep(3) if __name__ == __main__: O1 = Oh(1) O2 = Oh(2) O1.start() O2.start()
繼承式調用 技術分享圖片
import threading
import time

def Yes(num): # 定義要運行的函數 # 多線程直接式調用(常用) print(打印了一個數:%s% num) time.sleep(3) # 執行完等三秒 if __name__ ==__main__: y1 = threading.Thread(target=Yes, args=[1,]) # 實例化創建了一個線程 y2 = threading.Thread(target=Yes, args=[2,]) # 第二個線程 y1.start() # 線程開始 y2.start()
print(y1.getName()) # 打印線程的名字 print(y2.getName()) y1.join() # 主函數等待y1線程執行完過後再執行主線程 y2.join() # 線程為並行,全部執行完才一起等待 3 秒 print(我是最後被打印的東東。。) # 這是主線程執行的最後的東東
直接式調用

一、多線程方法

threading.enumerate()
返回當前運行中活著的線程對象列表
threading.active_count()
返回當前處於alive狀態的線程對象個數(包含主線程),等於enumerate的列表長度
threading.current_thread() 
返回當前的線程對象,對應於調用者控制的線程
threading.get_ident() 
返回當前進程的‘線程標識符’
threading.main_thread()
返回主線程對象
threading.stack_size()  
返回當創建一個新線程使用的線程棧大小,0 為默認配置
threading.TIMEOUT_MAX 
設置threading全局超時時間
二、多線程類:
Thread 一個執行線程的對象
Lock 鎖對象
RLock 可重入鎖對象,使單一線程(再次)獲得已持有的鎖對象(遞歸鎖)
Condition 條件變量對象,使得一個線程等待另外一個線程滿足特定的條件,比如改變狀態或者某個數據值
Event 條件變量的通用版本,任意數量的線程等待某個事件的發生,在該事件發生後所有的線程都將被激活
Semaphore 為線程間的有限資源提供一個計數器,如果沒有可用資源時會被阻塞
BoundedSemaphore 於Semaphore相似,不過它不允許超過初始值
Timer 於Thread類似,不過它要在運行前等待一定時間
Barrier 創建一個障礙,必須達到指定數量的線程後才可以繼續

1.Thread類


Thread(group = None,target = None,name = None , args = (), kwargs = {})

  • group: 線程組,目前還沒有實現,庫引用中提示必須是None;
  • target: 要執行的方法;
  • name: 線程名;
  • args/kwargs: 要傳入方法的參數

類中方法:

start() 啟動線程,調用start(),run()
run() 定義線程的方法,經常被重寫
join([timeout]) 阻塞到線程結束或到timeout值
getName() 獲取線程名
setName() 設置線程名
is_alive() 返回線程是否正在運行
isDaemon() 是否是守護線程(已棄用)
setDaemon() 設置為守護線程,默認為Flase

1>.start () & run()

  • start(): 啟動一個子線程,調用start()和run()
  • run(): 只調用run()

2> 守護線程.setDaemon()

  • 默認為False,線程在前臺運行,主線程執行過過程中,線程也在前臺執行,主線程執行完畢後,等待線程執行完成,主線程再停止執行
  • 設置為True後,線程在後臺運行,主線程執行過程中,線程也在後臺執行,主線程執行完畢後,無論線程成功與否,完成與否均停止執行

例子:

1.循環等待最後一個線程(對join的操作)

技術分享圖片
import threading
import time

# 循環等待最後一個線程

def Vera(num):

    print(我是一個數:%s% num)
    time.sleep(2)

t_list = []
if __name__ == __main__:
    for i in range(10):
        v = threading.Thread(target=Vera,args=[i,])
        v.start()
        # v.join()    # 線程串行 # 主線程等待子線程 v 執行完

        t_list.append(v)         # 並行的線程,所有的都執行完
    for i in t_list:    # 所有的線程都執行join
        i.join()
    print(\n我完了完了完了。。)
example 1

2.守護線程和join中timeout的設置

技術分享圖片
import threading
import time

def run(n):
    print(第 【%s】個進程 % n)
    time.sleep(3)
    print(等待 【%s】秒後 % n)
join_list =[]
def main():
    for i in range(5):
        t = threading.Thread(target=run ,args=[i,])
        t.start()
        # t.join()    # 設置單線程走
        print(開始線程,t.getName())
        join_list.append(t)
    for n in join_list:    # 設置每一個線程都等待完(多線程(一起完,然後再執行sleep中的秒數))
        n.join()

m = threading.Thread(target=main,args =[])
m.setDaemon(True)    # 守護線程
m.start()
m.join(timeout=2)    # (子線程設為線程執行完全時)主線程被設為守護線程後 等待2秒後完
# (線程可以不執行完全時(即子線程內沒有設置join))主線程被設為守護線程後,線程完全執行完但不等待sleep中的時間
print(主線程完了。。)
example 2

2.Lock & RLock類


gil是控制同一時刻在底層執行,是鎖解釋器級別以下的鎖,只管鎖底層而不管原生線程自己的內存數據間是否互斥,Lock則是加解釋器以上的線程間的互斥,但是Python3.X中,貌似加了一層鎖,但官方沒有做相關解釋,但為了保險起見,我們還是最好要加一層鎖。

lock(指令鎖): 全局

RLock(可重入鎖(遞歸鎖)):線程

方法:

  • acquire([timeout]):嘗試獲得鎖定,使線程進入同步阻塞狀態
  • release():釋放鎖,使用前線程必須獲得鎖定,否則拋出異常
技術分享圖片指令鎖 技術分享圖片
import threading
import time 

def run1():   # 小鎖一號
    print(我是run 1)
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():           # 小鎖二號
    print(我是run 2)
    lock.acquire()
    global num2
    num2 +=1
    lock.release()
    return num2
def run3():         # 大鎖 鎖住了一號和二號
    print(我是run 3)
    lock.acquire()
    res = run1()       # 確保run 1 和run 2 中間沒有其他的運行
    print(run 1 和 run 2)
    res2 = run2()
    lock.release()
    print(res, res2)

if __name__ == __main__:

    num, num2 = 0, 0
    lock = threading.RLock()     # 每一個鎖可以各自釋放
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()

while threading.active_count() !=1:   # 當前還有幾個線程
    print(threading.active_count())
else:
    print(所有的執行完了。。)
    print(num, num2)
遞歸鎖

Lock & RLok(對比)

技術分享圖片
import threading
lock = threading.Lock() #Lock對象
lock.acquire()
lock.acquire()  
print(yes)
lock.release()
lock.release()
print(lock.acquire())



# 結果

# 發生死鎖,無線循環
Lock 技術分享圖片
import threading
rLock = threading.RLock()      #RLock對象
rLock.acquire()
rLock.acquire() 
print(yes)
rLock.release()
rLock.release()
print(rlock.acquire())


# 結果

yes
True

#在同一線程內,程序不會堵塞。
RLock

3.Semaphore(信號量)&BoundSemaphore


互斥鎖:同時只允許一個線程更改數據

Semaphore: 同時允許多個線程更改數據

Semaphore 在內部管理這一個計數器,調用 .acquire()時 ,計數器 -1 ,調用 .release()時,計數器 -1,而當計數器等於0時,acquire()則阻塞,直到其他線程來調用release()

BoundSemaphore 有界信號量會確保它當前值不超它的初始值,如果超過則拋出valueError異常

方法:

  • acquire():嘗試獲得鎖定,使線程進入同步阻塞狀態
  • release():釋放鎖
技術分享圖片
import threading,time

def run(n):
    se.acquire()
    time.sleep(1)
    print(運行線程:%s % n)
    se.release()       # 釋放,信號量 + 1
    # se.release()    # 再次釋放,信號量 +1
    # 當指定為.Semaphore()時,多次的信號量 +1不會拋出異常
    # 當指定為.BoundeSemaphore()時,多次的信號量 +1 會拋出 ValueError 異常

if __name__ == __main__:
    num = 0

    se = threading.Semaphore(3)
    # se = threading.BoundedSemaphore(3)
    for i in range(20):
        t = threading.Thread(target=run,args=[i,])
        t.start()

    print(主線程over..)
    print(num)
信號量

4.Event類


一個線程通知一個事件,另一個線程等待通知並作出處理

方法:

  • isSet(): 當內部標誌為True則返回True,否則返回False
  • wait([timeout]): 不斷檢測set()是否阻塞,或者直到timeout超時
  • set(): 設置內部標誌為True,所有等待的線程都被喚醒
  • clear(): 重新設置內部標誌為False,調用wait()不斷對set()檢測直到set()被調用
技術分享圖片
import  threading,time

def light():
    if not event.isSet():
        event.set()        # 設置為Ture

    s = 0         # 假裝這是設置的計時秒數
    while True:
        if s < 6:
            print(假裝我是綠燈。。)

        elif s < 8:
            print(假裝我是黃燈。。)

        elif s < 12:
            if event.isSet():
                event.clear()            # 紅燈 所有車(線程)等待
            print(假裝我是紅燈。。)
        else:
            s = 0         # 秒數
            event.set()  # 重置為 綠燈

        time.sleep(2)
        s += 1        # 假設的秒數  加一


def car(n):
    while True:
        time.sleep(1)          # 每次的多個線程要一秒
        if event.isSet():
            print(車牌號【%s】,我過去啦。。啦。 % n)
        else:
            print(車牌號【%s】,我被塞住了。*_*。。。 % n)
            event.wait()    # 不斷檢測set()有沒有被置為True


if __name__ == __main__:
    event = threading.Event()
    Light = threading.Thread(target= light)
    Light.start()
    for i in range(5):     # 有 5 個車車在跑
        t = threading.Thread(target=car,args=[i, ])
        t.start()
車車過紅綠燈的故事


別人寫的就是好(一份導圖): https://blog.csdn.net/fz420/article/details/78958745

Python_多線程threading模塊