1. 程式人生 > >鎖問題與線程queue

鎖問題與線程queue

run pre self 了解 hello 不同 es2017 lose 情況下

一、同步鎖

1、join與互斥鎖

線程搶的是GIL鎖,GIL鎖相當於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行權限GIL也要立刻交出來

join是等待所有,即整體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓並發變成串行,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串行效率要更高

2、GIL VS Lock

鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據。結論:保護不同的數據就應該加不同的鎖。

GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

分析:

1)100個線程去搶GIL鎖,即搶執行權限
2) 肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()
3)極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL
4)直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重復2 3 4的過程

3、join與互斥鎖對比實例

1)未處理代碼:

技術分享 初始

2)加互斥鎖:

技術分享 Lock

3)join效果

技術分享 join

即在start之後立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是start後立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.


二、死鎖現象與遞歸鎖

1、死鎖現象

進程也有死鎖與遞歸鎖與線程中相同

死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程

技術分享
from threading import Lock,Thread
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘\033[31m%s 拿到A鎖‘ %self.name)
        mutexB.acquire()
        print(‘\033[32m%s 拿到B鎖‘ %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘\033[33m%s 拿到B鎖‘ %self.name)
        time.sleep(1)
        mutexA.acquire()
        print(‘\033[34m%s 拿到A鎖‘ %self.name)
        mutexA.release()
        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t=MyThread()
        t.start()
技術分享

死鎖狀態,程序永遠無法結束:

技術分享

2、遞歸鎖

上述情況可以用遞歸鎖解決

遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

技術分享
from threading import ,Thread,RLock
import time
mutexB=mutexA=RLock()
#一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘\033[31m%s 拿到A鎖‘ %self.name)
        mutexB.acquire()
        print(‘\033[32m%s 拿到B鎖‘ %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘\033[33m%s 拿到B鎖‘ %self.name)
        time.sleep(1)
        mutexA.acquire()
        print(‘\033[34m%s 拿到A鎖‘ %self.name)
        mutexA.release()
        mutexB.release()

if __name__ == ‘__main__‘:
    for i in range(10):
        t=MyThread()
        t.start()
技術分享

三、信號量Semaphore

Semaphore也是一種鎖不過這個鎖可以自己定義同時可以進入鎖的線程數

Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;

計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:

1、互斥鎖Lock就像家裏的廁所每次只能進一人,進去後鎖門其他人在外面等著(這是學進程互斥鎖時的例子)

技術分享
from multiprocessing import Process,Lock,current_process
import time,random
def work(mutex):
    mutex.acquire()  #上鎖
    print(‘%s 上廁所‘ %current_process().name)
    time.sleep(random.randint(1,3))
    print(‘%s 走了‘ %current_process().name)
    mutex.release()  #開鎖

if __name__ == ‘__main__‘:
    mutex=Lock()  #實例化(互斥鎖)
    print(‘start...‘)
    for i in range(20):
        t=Process(target=work,args=(mutex,))
        t.start()
技術分享

2、信號量Semaphore就像是街道的公共廁所有固定個數的隔間(例如5個),剛開始可以進去5個,然後出來幾個便可以再進去幾個

技術分享
from threading import Thread,Semaphore,currentThread
import time,random
sm=Semaphore(5)
def task():
    sm.acquire()
    print(‘%s 上廁所‘ %currentThread().getName())
    time.sleep(random.randint(1,3))
    print(‘%s 走了‘ %currentThread().getName())
    sm.release()
if __name__ == ‘__main__‘:
    for i in range(20):
        t=Thread(target=task)
        t.start()
技術分享

與進程池相似但是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程


四、Event

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。

1、模擬紅綠燈

技術分享
from threading import Thread,Event,currentThread
import time
e=Event()

def traffic_lights():
    time.sleep(5)
    e.set()

def car():
    print(‘\033[41m%s 等‘ %currentThread().getName())
    e.wait()
    print(‘\033[42m%s 跑‘ %currentThread().getName())


if __name__ == ‘__main__‘:
    for i in range(10):
        t=Thread(target=car)
        t.start()
    traffic_thread=Thread(target=traffic_lights)
    traffic_thread.start()
技術分享

2、有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那麽我們就可以采用threading.Event機制來協調各個工作線程的連接操作

技術分享
from threading import Thread,Event,currentThread
import time
e=Event()
def conn_mysql():
    count=1
    while not e.is_set():
        if count > 3:
            raise ConnectionError(‘嘗試鏈接的次數過多‘)
        print(‘\033[35m%s 第%s次嘗試‘ %(currentThread().getName(),count))
        e.wait(timeout=1)
        count+=1
    print(‘\033[32m%s 開始鏈接‘ %currentThread().getName())

def check_mysql():
    print(‘\033[34m%s 檢測mysql...‘ %currentThread().getName())
    time.sleep(2)
    e.set()
if __name__ == ‘__main__‘:
    for i in range(3):
        t=Thread(target=conn_mysql)
        t.start()
    t=Thread(target=check_mysql)
    t.start()
技術分享

五、定時器

定時器,指定n秒後執行某操作

技術分享
from threading import Timer

def hello(n):
    print("hello, world",n)

#三秒後運行hello函數傳入參數123
t = Timer(3, hello, args=(123,))
t.start()
技術分享

六、線程queue

queue隊列 :使用import queue,用法與進程Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

隊列在線程編程中尤其有用,因為必須在多個線程之間安全地交換信息。

1、queue.Queue() 先進先出

技術分享
import queue

q=queue.Queue()
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(先進先出):
first
second
third
‘‘‘
技術分享

2、queue.LifoQueue() 後進先出

技術分享
import queue

q=queue.LifoQueue()
q.put(‘first‘)
q.put(‘second‘)
q.put(‘third‘)

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(後進先出):
third
second
first
‘‘‘
技術分享

3、queue.PriorityQueue() 存儲數據時可設置優先級的隊列

技術分享
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,‘a‘))
q.put((10,‘b‘))
q.put((30,‘c‘))

print(q.get())
print(q.get())
print(q.get())
‘‘‘
結果(數字越小優先級越高,優先級高的優先出隊):
(10, ‘b‘)
(20, ‘a‘)
(30, ‘c‘)
‘‘‘
技術分享

鎖問題與線程queue