python --- 基礎多線程編程
在python中進行多線程編程之前必須了解的問題:
1. 什麽是線程?
答:線程是程序中一個單一的順序控制流程。進程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派CPU的基本單位指運行中的程序的調度單位。
2. 什麽是多線程?
答:在單個程序中同時運行多個線程完成不同的工作,稱為多線程。
3. 多線程編程的目的?
答:多線程是為了同步完成多項任務,不是為了提高運行效率,而是為了提高資源使用效率來提高系統的效率。線程是在同一時間需要完成多項任務的時候實現的。
4. 如何再python中執行多線程編程?
答:在python2.x的版本中提供了thread(這個模塊為多線程提供了一個底層 、原始的操作[也可以成為light-weight processes 或者 tasks) — 多個控制線程共享全局數據空間。為了多線程同步,提供了簡單的鎖(也稱呼為 mutexes 後者 binary semaphores) 。]和threading(本模塊的高層線程接口構建在低層的thread模塊上)兩個模塊用於線程操作;而在python3.x中,官方只給出了threading模塊的文檔,對於底層線程造作放在了_thread模塊中(即不建議使用)。是故在python中使用threading模塊編程即可。
例一(一個簡單的雙線程程序):
1 import threading 2 import time 3 4 def run(n): 5 print("task-%s" % n) 6 time.sleep(5) 7 8 #實例化一個線程對象,target傳入任務名,args以元組的形式傳入任務函數的參數 9 task1 = threading.Thread(target=run, args=(1,)) 10 task2 = threading.Thread(target=run, args=(2,)) 11 12 task1.start() #線程啟動test_threads_113 task2.start()
註:執行上面這個程序一共花費5秒左右的時間,而如果分別調用兩次run怎需要10秒(證明兩個任務同時運行)。
例二(用面向對象編程實現例二,並講解threading模塊中的一些常用方法):
1 import threading 2 import time 3 4 class Task(threading.Thread): 5 def __init__(self, n): 6 super(Task, self).__init__() #重構了__init__(), 所以先執行父類的構造函數test_threads_27 self.n = n #構造函數中傳入任務參數參數 8 9 #任務函數 10 def run(self): 11 """ 12 不同於例一,在類中,任務函數只能以run命名,參數從構造函數中傳入 13 :return: 14 """ 15 print("task-%s" % self.n) 16 # print(threading.current_thread()) #threading.current_thread()返回當前的Thread對象,對應於調用者控制的線程。 17 time.sleep(2) 18 19 #實例化並啟動 20 t1 = Task(1) 21 t2 = Task(2) 22 # t1.setDaemon(True) #將t1設置成為守護線程 23 # t2.setDaemon(True) 24 25 t1.start() 26 #t1.join(timeout=5) #等待t1結束再運行t2,timeout代表等待時間,超過5秒便不再等待。 27 t2.start() 28 29 # print(threading.current_thread()) 30 # print(threading.active_count()) #threading.active_count()返回當前處於alive狀態的Thread對象的個數。
註:建議使用例二這種編程方式,在類中,任務函數只能以run命名,參數從構造函數中傳入。
註:有時候難免會遇到一個問題,主程序的執行需要某個子線程(即使用start啟動的線程,默認子線程一旦啟動主程序將繼續運行,兩者不再有關聯)的執行結果,這時使用join即可,例如例二中的t1.join()。
守護線程:一種特殊的子線程,與主線程(即本文中的主程序,由程序使用者啟動)存在一種“主死仆死”的關系,即主線程意外停止或運行結束,不管守護線程是否運行完畢都得終止(非守護線程的子線程啟動後與主線程沒有聯系,不論主線程是否還在運行,子線程不受影響),一個線程是否為守護線程繼承於創建它的線程(將一個子線程設置成為守護線程可參考例二,去掉22,23行註釋,則主程序結束t1,t2也結束,不會再sleep)。
在進行多線程編程時,經常會有多個線程同時對同一份數據修改,這便會導致最終得得到的數據不是預期結果。所以需要在一個線程修改數據時將數據鎖定,只允許當前線程修改,當修改完成後,解鎖讓其他線程修改。在threading模塊中使用Lock Objects解決此問題。
Lock Objects:
A primitive lock is a synchronization primitive that is not owned by a particular thread when locked. In Python, it is currently the lowest level synchronization primitive available, implemented directly by the _thread
extension module.
A primitive lock is in one of two states, “locked” or “unlocked”. It is created in the unlocked state. It has two basic methods, acquire()
and release()
. When the state is unlocked, acquire()
changes the state to locked and returns immediately. When the state is locked, acquire()
blocks until a call to release()
in another thread changes it to unlocked, then the acquire()
call resets it to locked and returns. The release()
method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError
will be raised.
Locks also support the context management protocol.
When more than one thread is blocked in acquire()
waiting for the state to turn to unlocked, only one thread proceeds when a release()
call resets the state to unlocked; which one of the waiting threads proceeds is not defined, and may vary across implementations.
All methods are executed atomically.
例三(多個線程對一個全局變量進行加1操作):
1 import threading,time 2 3 4 n = 5 5 6 class Task(threading.Thread): 7 def __init__(self, n): 8 super(Task, self).__init__() 9 self.name = "Thread-" + str(n) 10 11 def run(self): 12 """ 13 不同於例一,在類中,任務函數只能以run命名,參數從構造函數中傳入 14 :return: 15 """ 16 global n 17 time.sleep(0.5) 18 #鎖定資源, 此時關於n的操作只能此線程執行 19 if lock.acquire(): 20 n += 1 21 print("%s, n = %s" % (self.name, n)) 22 time.sleep(0.2) 23 lock.release() #解鎖 24 25 if __name__ == "__main__": 26 27 lock = threading.Lock() #實例化一個鎖,此時鎖處於打開狀態 28 29 ts = [] #用於存儲線程對象 30 #循環啟動50個線程 31 for i in range(1, 50+1): 32 t = Task(i) 33 ts.append(t) 34 t.start() 35 #等待所有線程結束 36 start_time = time.time() 37 for t in ts: 38 t.join() 39 40 print("run time:",time.time() - start_time)test_threads_3
註:運行此程序,可發現一共用時10.5(0.2*50 + 0.5)秒左右,可以發現在鎖定代碼段程序沒有並行執行。
註:有時候需要多層加鎖,這時Lock Objects已經滿足不了這個需求(當Lock處於locked時,遇到下一個acquire()時會阻塞)。這時我們使用RLock Objects,它的調用同Lock Objects。
信號量:信號量(Semaphore),有時被稱為信號燈,是在多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被並發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那麽該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。
舉例說明:以一個停車場的運作為例。簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛直接進入,然後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入外面的一輛進去,如果又離開兩輛,則又可以放入兩輛,如此往復。
在這個停車場系統中,車位是公共資源,每輛車好比一個線程,看門人起的就是信號量的作用。 信號量的特性:信號量是一個非負整數(車位數),所有通過它的線程/進程(車輛)都會將該整數減一(通過它當然是為了使用資源),當該整數值為零時,所有試圖通過它的線程都將處於等待狀態。 簡單來說,當使用Lock或RLock的locked期間,沒有並發執行(只unlocked後才並行);而使用Semaphore可以讓有限個線程共同訪問資源,它的使用也是只有acquire()和release()。 例四(信號量的使用):1 import threading 2 import time 3 4 class Task(threading.Thread): 5 #任務函數 6 def __init__(self, n): 7 super(Task, self).__init__() 8 self.name = "Thread-" + str(n) 9 10 def run(self): 11 semaphore.acquire() 12 print("%s" % self.name) 13 time.sleep(2) 14 semaphore.release() 15 16 17 if __name__ == "__main__": 18 19 semaphore = threading.BoundedSemaphore(5) #只允許5個線程同時訪問資源 20 21 ts = [] #用於存儲線程對象 22 #循環啟動50個線程 23 24 for i in range(1, 23+1): 25 t = Task(i) 26 ts.append(t) 27 t.start()test_threads_4
註:通過運行上面的代碼,可以發現在關鍵代碼段最多只有5個線程在同時運行。 每調用一次acquire(),Semaphore的計數器減1;每調用一次release(),Semaphore的計數器加1,最大不超過設置的計數器初始值。 既然有了多線程編程,那麽自然也就有了線程之間的交互。在python中threading模塊提供了Event Objocts實現這個功能。 Events Objects: This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it(事件對象是線程間最簡單的通信機制之一:線程可以激活在一個事件對象上等待的其他線程).
An event object manages an internal flag that can be set to true with the set()
method and reset to false with the clear()
method. The wait()
method blocks until the flag is true(每個事件對象管理一個內部標誌,可以在事件對象上調用set()
方法將內部標誌設為true,調用 clear()
方法將內部標誌重置為false。wait()
方法將阻塞直至該標誌為真。).
例五(簡單的線程交互程序,演示Event Objects的使用):
1 import threading,time 2 3 class Task1(threading.Thread): 4 def run(self): 5 while True: 6 for i in range(1, 20+1): 7 print("i = ", i) 8 if i % 5 == 0: 9 eve.set() #到5的倍數,將Event內部標誌設置為True 10 time.sleep(0.5) 11 12 class Task2(threading.Thread): 13 def run(self): 14 while True: 15 #檢測內部標誌是否為True 16 if eve.is_set(): 17 print("hello world") 18 time.sleep(2) 19 eve.clear() #重置Event內部標誌 20 else: 21 eve.wait() #內部標誌不為True,此線程處於阻塞狀態 22 23 if __name__ == "__main__": 24 t1 = Task1() 25 t2 = Task2() 26 eve = threading.Event() #實例化一個Event Objects 27 28 t1.start() 29 t2.start()test_threads_5
python --- 基礎多線程編程