1. 程式人生 > >python--執行緒同步原語

python--執行緒同步原語

Threading模組是python3裡面的多執行緒模組,模組內集成了許多的類,其中包括Thread,Condition,Event,Lock,Rlock,Semaphore,Timer等等。下面這篇文章主要通過案例來說明其中的Event和Segmaphore(Boundedsegmaphore)的使用。關於Lock的使用可以移步到我之前寫的文章python同步原語--執行緒鎖。

 

Event

Event類內部儲存著一個flags引數,標誌事件的等待與否。

Event類例項函式

1. set() 將flags設定為True,事件停止阻塞

2. clear()  將flags重新設定為False,刪除flags,事件重新阻塞

3. wait() 將事件設定為等待狀態

4.is_set()判斷flags是否被設定,如果被設定返回True,否則返回False

 

(1)單個事件等待其他事件的發生

 具體程式碼:

from time import ctime,sleep
event = Event()

def event_wait():
    print(ctime())
    event.wait()
    print('這是event_wait方法中的時間',ctime())

def event_set(n):
    sleep(n)
    event.set()
    
print('這是event_set方法中的時間', ctime()) thread1 = Thread(target=event_wait) thread2 = Thread(target=event_set,args=(3,)) thread1.start() thread2.start()

結果:

Sat Nov 17 10:01:05 2018
這是event_wait方法中的時間 Sat Nov 17 10:01:08 2018
這是event_set方法中的時間  Sat Nov 17 10:01:08 2018

 

(2)多個事件先後發生

下面以賽跑來作為例子。假設5條跑道上,每條跑道各有一名運動員,分別為ABCDE。

具體程式碼:

from threading import Event
from  threading import Thread
import threading

event = Event()

def do_wait(athlete):
    racetrack = threading.current_thread().getName()
    print('%s準備就緒' % racetrack)
    event.wait()
    print('%s聽到槍聲,起跑!'%athlete)

thread1 = Thread(target=do_wait,args=("A",))
thread2 = Thread(target=do_wait,args=("B",))
thread3 = Thread(target=do_wait,args=("C",))
thread4 = Thread(target=do_wait,args=("D",))
thread5 = Thread(target=do_wait,args=("E",))

threads = []
threads.append(thread1)
threads.append(thread2)
threads.append(thread3)
threads.append(thread4)
threads.append(thread5)

for th in threads:
    th.start()

event.set()  #這個set()方法很關鍵,同時對5個執行緒中的event進行set操作

結果:

Thread-1準備就緒
Thread-2準備就緒
Thread-3準備就緒
Thread-4準備就緒
Thread-5準備就緒
E聽到槍聲,起跑!
A聽到槍聲,起跑!
B聽到槍聲,起跑!
D聽到槍聲,起跑!
C聽到槍聲,起跑!

可以看出多個執行緒中event的set()是隨機的,其內部的實現是因為一個notify_all()方法。這個方法會一次性釋放所有鎖住的事件,哪個執行緒先搶到執行緒執行的時間片,就先釋放鎖。

之所以能夠只調用一個set()函式就可以實現所有event的退出阻塞,是因為event.wait()是線上程內部實現的,而set()函式是在程序中呼叫,python多執行緒共享一個程序記憶體空間。如果是在不同程序中呼叫這兩個函式則無法實現。

 

BoundedSegmaphore

如果在主機執行IO密集型任務的時候再執行這種短時間內完成大量任務(多執行緒)的程式時,計算機就有很大可能會宕機。

這時候就可以為這段程式新增一個計數器(counter)功能,來限制一個時間點內的執行緒數量。當每次進行IO操作時,都需要向segmaphore請求資源(鎖),如果沒有請求到,就阻塞等待,請求成功才就像執行任務。

BoundedSegmaphore和Segmaphore的區別

BoundedSegmaphore請求的鎖數量固定為傳入引數,而Segmaphore請求的鎖數量可以超過傳入的引數。

主要函式:

1. acquire()  請求鎖

2. release()   釋放鎖

 

下面以一個租房的例子來說明這種固定鎖數量的機制。假設一家小公寓有6間房,原本有2個住戶在住著。

具體程式碼實現:

from threading import BoundedSemaphore,Lock,Thread
from time import sleep
from random import randrange

lock = Lock()
num = 6
hotel = BoundedSemaphore(num)

def logout():
    lock.acquire()
    print('I want to logout')
    print('A customer logout...')
    try:
        hotel.release()
        print('Welcome again')
    except ValueError:
        print('Sorry,wait a moment.')
    lock.release()

def login():
    lock.acquire()
    print('I want to login')
    print('A customer login...')
    if hotel.acquire(False):
        print('Ok,your room number is...')
    else:
        print('Sorry,our hotel is full')
    lock.release()

#房東
def producer(loops):
    for i in range(loops):
        logout()
        print('還剩%s' % hotel._value, '房間')
        sleep(randrange(2))
#租客
def consumer(loops):
    for i in range(loops):
        login()
        print('還剩%s' % hotel._value, '房間')
        sleep(randrange(2))
def main():
    print('Start')
    room_num = hotel._value
    print('The hotel is full with %s room'%room_num)
    #原本有2個住戶
    hotel.acquire()
    hotel.acquire()
    thread1 = Thread(target=producer,args=(randrange(2,8),))
    thread2 = Thread(target=consumer,args=(randrange(2,8),))
    thread1.start()
    thread2.start()

if __name__ == '__main__':
    main()

 

結果:

The hotel is full with 6 room
I want to logout
A customer logout...
Welcome again
還剩5 房間
I want to logout
A customer logout...
Welcome again
還剩6 房間
I want to login
A customer login...
Ok,your room number is...
還剩5 房間
I want to login
A customer login...
Ok,your room number is...
還剩4 房間

 

可以看出,房間數目永遠不會超過6,因為_value值(BoundedSegmaphore內部的計數器counter)一定是傳入的引數6。