1. 程式人生 > 實用技巧 >(七)多執行緒之(訊號量,Event,定時器)

(七)多執行緒之(訊號量,Event,定時器)

一、訊號量(Semaphore)


訊號量也是一把鎖,可以指定訊號量為5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,訊號量同一時間可以有5個任務拿到鎖去執行,如果說互斥鎖是合租房屋的人去搶一個廁所,那麼訊號量就相當於一群路人爭搶公共廁所,公共廁所有多個坑位,這意味著同一時間可以有多個人上公共廁所,但公共廁所容納的人數是一定的,這便是訊號量的大小。

from threading import Thread,Semaphore,currentThread
import time,random

# 同時只有3個執行緒可以獲得semaphore,即可以限制最大連線數為3
s = Semaphore(3)    #
坑位 def task(): # s.acquire() # print("%s come in." % currentThread().getName()) # s.release() with s: # 上下文管理,可以省略加鎖和釋放,效果和上面一樣 print("%s come in." % currentThread().getName()) time.sleep(random.randint(1,3)) # 模擬每個人上的時間,每個人時間不同 if __name__ == '__main__':
for i in range(10): t = Thread(target=task) t.start()

解析:

# Semaphore管理一個內建的計數器,
# 每當呼叫acquire()時內建計數器-1;
# 呼叫release() 時內建計數器+1;
# 計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

與程序池是完全不同的概念,程序池Pool(4),最大隻能產生4個程序,而且從頭到尾都只是這四個程序,不會產生新的,而訊號量是產生一堆執行緒/程序。

二、Event


執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。

如果程式中的其他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。

為了解決這些問題,我們需要使用threading庫中的Event物件。

物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。

在初始情況下,Event物件中的訊號標誌被設定為假。

如果有執行緒等待一個Event物件,而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。

一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。

如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件,繼續執行。

from threading import Event

# event.isSet():返回event的狀態值;

# event.wait():如果 event.isSet()==False將阻塞執行緒;

# event.set():設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;

# event.clear():恢復event的狀態值為False。
from threading import Thread,Event
import time
# event無非就是實現了(執行緒之間的同步),一個執行緒通知另外一個執行緒,說我已經準備好我的活了,你可以接著執行你其他的任務了

event = Event()
# event.wait()    # 在原地等
# event.set()     # 這個執行算是等完了,發這個訊號了,上面的wait就不用等著了

def student(name):
    print("學生%s正在聽課" % name)
    event.wait()    # 只要沒有人下發下課的訊號,就一直卡著
    print("學生%s課間活動" % name)

def teacher(name):
    print("老師%s正在授課" % name)
    time.sleep(7)
    event.set()     # 發訊號

if __name__ == '__main__':
    stu1 = Thread(target=student,args=("娃娃魚",))
    stu2 = Thread(target=student,args=("託兒所",))
    stu3 = Thread(target=student,args=("兒童劫",))
    tea1 = Thread(target=teacher,args=("菊花信",))

    stu1.start()
    stu2.start()
    stu3.start()
    tea1.start()
event.wait()與event.set()用法
from threading import Thread,Event
import time

event = Event()

def student(name):
    print("學生%s正在聽課" % name)
    event.wait(2)    # 裡面可以設定超時時間,沒必要等到睡7秒
    print("學生%s課間活動" % name)

def teacher(name):
    print("老師%s正在授課" % name)
    time.sleep(7)
    event.set()     # 發訊號

if __name__ == '__main__':
    stu1 = Thread(target=student,args=("娃娃魚",))
    stu2 = Thread(target=student,args=("託兒所",))
    stu3 = Thread(target=student,args=("兒童劫",))
    tea1 = Thread(target=teacher,args=("菊花信",))

    stu1.start()
    stu2.start()
    stu3.start()
    tea1.start()
event.wait(2)

例如,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作:

from threading import Thread,Event,currentThread
import time

def conn_mysql():
    n = 1
    while not event.is_set():
        if n > 3:
            raise TimeoutError("連結超時...")

        print("<%s>第%s次嘗試連結..." % (currentThread().getName(),n))
        event.wait(0.5)
        n += 1
        print("<%s> 連結成功..." % currentThread().getName())


def check_mysql():
    print("\33[36;1m<%s>正在檢查...\33[0m" % currentThread().getName())
    time.sleep(5)   # 模擬檢測時間
    event.set()

if __name__ == '__main__':
    event = Event()
    for i in range(3):  # 啟三個執行緒
        t = Thread(target=conn_mysql)
        t.start()
    th = Thread(target=check_mysql)
    th.start()

三、定時器


定時器:隔多長時間以後觸發一個任務的執行。

from threading import Timer

def task(name):
    print("hello %s." % name)

# 裡面有幾個引數,interval代表時間間隔(秒級),function代表隔了多少秒之後要執行的那個函式,args,kwargs
t = Timer(5,task,args=("託兒所",))
t.start()
# 5s 後執行 hello 託兒所.

驗證碼實現:

# 驗證碼
from threading import Timer
import random

class Code:
    def __init__(self):
        self.make_cache()   # init先執行,立馬拿到一個快取的驗證碼
        self.check()

    def make_cache(self,interval=60):
        """快取的功能,快取每次產生的驗證碼"""
        self.cache = self.make_code()  # 生成的驗證碼快取下來
        print(self.cache)
        self.t = Timer(interval,self.make_cache)    # 每隔60s重新執行一遍,又重新整理一遍原來的快取
        self.t.start()

    def make_code(self,n=4):
        res = ""
        for i in range(n):  # 代表驗證碼的位數
            s1 = str(random.randint(0,9))    # 隨機數轉成 str,是為了一會要拼起來
            s2 = chr(random.randint(65,90))  # 65~90的數字,正好是 ASCII表的小寫字母到大寫字母
            res += random.choice([s1,s2])      # 把s1,s2串起來
        return res

    def check(self):
        while True:
            code = input("請輸入你的驗證碼:").strip().upper()
            if code == self.cache:
                print("驗證碼輸入正確。")
                self.t.cancel()
                break
            else:
                print("驗證碼輸入錯誤,請重新輸入。")
                continue

if __name__ == '__main__':
    obj = Code()