python 實現執行緒之間的通訊示例
前言:因為GIL的限制,python的執行緒是無法真正意義上並行的。相對於非同步程式設計,其效能可以說不是一個等量級的。為什麼我們還要學習多執行緒程式設計呢,雖然說非同步程式設計好處多,但程式設計也較為複雜,邏輯不容易理解,學習成本和維護成本都比較高。畢竟我們大部分人還是適應同步編碼的,除非一些需要高效能處理的地方採用非同步。
首先普及下程序和執行緒的概念:
程序:程序是作業系統資源分配的基本單位。
執行緒:執行緒是任務排程和執行的基本單位。
一個應用程式至少一個程序,一個程序至少一個執行緒。
兩者區別:同一程序內的執行緒共享本程序的資源如記憶體、I/O、cpu等,但是程序之間的資源是獨立的。
一、多執行緒
python 可以通過 thread 或 threading 模組實現多執行緒,threading 相比 thread 提供了更高階、更全面的執行緒管理。我們下文主要以 threading 模組介紹多執行緒的基本用法。
import threading import time class thread(threading.Thread): def __init__(self,threadname): threading.Thread.__init__(self,name='執行緒' + threadname) def run(self): print('%s:Now timestamp is %s'%(self.name,time.time())) threads = [] for a in range(int(5)): # 執行緒個數 threads.append(thread(str(a))) for t in threads: # 開啟執行緒 t.start() for t in threads: # 阻塞執行緒 t.join() print('END')
輸出:
執行緒3:Now timestamp is 1557386184.7574518
執行緒2:Now timestamp is 1557386184.7574518
執行緒0:Now timestamp is 1557386184.7574518
執行緒1:Now timestamp is 1557386184.7574518
執行緒4:Now timestamp is 1557386184.7582724
END
start() 方法開啟子執行緒。執行多次 start() 方法代表開啟多個子執行緒。
join() 方法用來阻塞主執行緒,等待子執行緒執行完成。舉個例子,主執行緒A建立了子執行緒B,並使用了 join() 方法,主執行緒A在 join() 處就被阻塞了,等待子執行緒B完成後,主執行緒A才能執行 print('END')。如果沒有使用 join() 方法,主執行緒A建立子執行緒B後,不會等待子執行緒B,直接執行 print('END'),如下:
import threading import time class thread(threading.Thread): def __init__(self,name='執行緒' + threadname) def run(self): time.sleep(1) print('%s:Now timestamp is %s'%(self.name,time.time())) threads = [] for a in range(int(5)): # 執行緒個數 threads.append(thread(str(a))) for t in threads: # 開啟執行緒 t.start() # for t in threads: # 阻塞執行緒 # t.join() print('END')
輸出:
END
執行緒0:Now timestamp is 1557386321.376941
執行緒3:Now timestamp is 1557386321.377937
執行緒1:Now timestamp is 1557386321.377937
執行緒2:Now timestamp is 1557386321.377937
執行緒4:Now timestamp is 1557386321.377937
二、執行緒之間的通訊
1.threading.Lock()
如果多個執行緒對某一資源同時進行修改,可能會存在不可預知的情況。為了修改資料的正確性,需要把這個資源鎖住,只允許執行緒依次排隊進去獲取這個資源。當執行緒A操作完後,釋放鎖,執行緒B才能進入。如下指令碼是開啟多個執行緒修改變數的值,但輸出結果每次都不一樣。
import threading money = 0 def Order(n): global money money = money + n money = money - n class thread(threading.Thread): def __init__(self,name='執行緒' + threadname) self.threadname = int(threadname) def run(self): for i in range(1000000): Order(self.threadname) t1 = thread('1') t2 = thread('5') t1.start() t2.start() t1.join() t2.join() print(money)
接下來我們用 threading.Lock() 鎖住這個變數,等操作完再釋放這個鎖。lock.acquire() 給資源加一把鎖,對資源處理完成之後,lock.release() 再釋放鎖。以下指令碼執行結果都是一樣的,但速度會變慢,因為執行緒只能一個個的通過。
import threading money = 0 def Order(n): global money money = money + n money = money - n class thread(threading.Thread): def __init__(self,name='執行緒' + threadname) self.threadname = int(threadname) def run(self): for i in range(1000000): lock.acquire() Order(self.threadname) lock.release() # print('%s:Now timestamp is %s'%(self.name,time.time())) lock = threading.Lock() t1 = thread('1') t2 = thread('5') t1.start() t2.start() t1.join() t2.join() print(money)
2.threading.Rlock()
用法和 threading Lock() 一致,區別是 threading.Rlock() 允許多次鎖資源,acquire() 和 release() 必須成對出現,也就是說加了幾把鎖就得釋放幾把鎖。
lock = threading.Lock() # 死鎖 lock.acquire() lock.acquire() print('...') lock.release() lock.release() rlock = threading.RLock() # 同一執行緒內不會阻塞執行緒 rlock.acquire() rlock.acquire() print('...') rlock.release() rlock.release()
3.threading.Condition()
threading.Condition() 可以理解為更加高階的鎖,比 Lock 和 Rlock 的用法更高階,能處理一些複雜的執行緒同步問題。threading.Condition() 建立一把資源鎖(預設是Rlock),提供 acquire() 和 release() 方法,用法和 Rlock 一致。此外 Condition 還提供 wait()、Notify() 和 NotifyAll() 方法。
wait():執行緒掛起,直到收到一個 Notify() 通知或者超時(可選引數),wait() 必須線上程得到 Rlock 後才能使用。
Notify() :線上程掛起的時候,傳送一個通知,讓 wait() 等待執行緒繼續執行,Notify() 也必須線上程得到 Rlock 後才能使用。 Notify(n=1),最多喚醒 n 個執行緒。
NotifyAll() :線上程掛起的時候,傳送通知,讓所有 wait() 阻塞的執行緒都繼續執行。
舉例說明下 Condition() 使用
import threading,time def TestA(): cond.acquire() print('李白:看見一個敵人,請求支援') cond.wait() print('李白:好的') cond.notify() cond.release() def TestB(): time.sleep(2) cond.acquire() print('亞瑟:等我...') cond.notify() cond.wait() print('亞瑟:我到了,發起衝鋒...') if __name__=='__main__': cond = threading.Condition() testA = threading.Thread(target=TestA) testB = threading.Thread(target=TestB) testA.start() testB.start() testA.join() testB.join()
輸出
李白:看見一個敵人,請求支援
亞瑟:等我...
李白:好的
亞瑟:我到了,發起衝鋒...
4.threading.Event()
threading.Event() 原理是線上程中立了一個 Flag ,預設值是 False ,當一個或多個執行緒遇到 event.wait() 方法時阻塞,直到 Flag 值 變為 True 。threading.Event() 通常用來實現執行緒之間的通訊,使一個執行緒等待其他執行緒的通知 ,把 Event 傳遞到執行緒物件中。
event.wait() :阻塞執行緒,直到 Flag 值變為 True
event.set() :設定 Flag 值為 True
event.clear() :修改 Flag 值為 False
event.isSet() : 僅當 Flag 值為 True 時返回
下面這個例子,主執行緒啟動子執行緒後 sleap 2秒,子執行緒因為 event.wait() 被阻塞。當主執行緒醒來後執行 event.set() ,子執行緒才繼續執行,兩者輸出時間差 2s。
import threading import datetime,time class thread(threading.Thread): def __init__(self,name='執行緒' + threadname) self.threadname = int(threadname) def run(self): event.wait() print('子執行緒執行時間:%s'%datetime.datetime.now()) if __name__ == '__main__': event = threading.Event() t1 = thread('0') #啟動子執行緒 t1.start() print('主執行緒執行時間:%s'%datetime.datetime.now()) time.sleep(2) # Flag設定成True event.set() t1.join()
輸出
主執行緒執行時間:2019-05-30 15:51:49.690872
子執行緒執行時間:2019-05-30 15:51:51.691523
5.其他方法
threading.active_count():返回當前存活的執行緒物件的數量
threading.current_thread():返回當前執行緒物件
threading.enumerate():返回當前所有執行緒物件的列表
threading.get_ident():返回執行緒pid
threading.main_thread():返回主執行緒物件
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。