多線程(threading module)
一、線程與進程
線程定義:線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。
進程定義:An executing instance of a program is called a process.(程序的執行實例稱為進程。)
線程與進程的區別:
1. 線程共享創建它的進程的地址空間; 進程有自己的地址空間。
2. 線程可以直接訪問其進程的數據段; 進程擁有自己父進程數據段的副本。
3. 線程可以直接與其進程的其他線程通信; 進程必須使用進程間通信來與兄弟進程通信。
4. 新線程很容易創建; 新流程需要復制父流程。
5. 線程可以對同一進程的線程進行相當大的控制; 進程只能控制子進程。
6. 對主線程的更改(取消,優先級更改等)可能會影響進程的其他線程的行為; 對父進程的更改不會影響子進程。
二、Python GIL(Global Interpreter Lock)
--> 全局解釋器鎖 :在同一時刻,只能有一個線程進入解釋器。
三、threading 模塊
3.1 線程的2種調用方式
直接調用
1 import threading 2 import time 3 4 def sayhi(num): #定義每個線程要運行的函數 5 6 print("running on number:%s" %num) 7 8 time.sleep(3) 9 10 if __name__ == ‘__main__‘: 11 12 t1 = threading.Thread(target=sayhi,args=(1,)) #View Code生成一個線程實例 13 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程實例 14 15 t1.start() #啟動線程 16 t2.start() #啟動另一個線程 17 18 print(t1.getName()) #獲取線程名 19 print(t2.getName())
繼承式調用
1 import threading 2 import time 3 4 5 class MyThread(threading.Thread): 6 defView Code__init__(self,num): 7 threading.Thread.__init__(self) 8 self.num = num 9 10 def run(self):#定義每個線程要運行的函數 11 12 print("running on number:%s" %self.num) 13 14 time.sleep(3) 15 16 if __name__ == ‘__main__‘: 17 18 t1 = MyThread(1) 19 t2 = MyThread(2) 20 t1.start() 21 t2.start()
3.2 常用方法(Join/Daemon)
join() --> 在子線程完成運行之前,這個子線程的父線程將一直被阻塞。
setDaemon(True) --> 將線程聲明為守護線程,必須在start() 方法調用之前設置,守護線程隨主線程結束而結束。
其他方法:
1 threading 模塊提供的其他方法: 2 # threading.currentThread(): 返回當前的線程變量。 3 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。 4 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。 5 # 除了使用方法外,線程模塊同樣提供了Thread類來處理線程,Thread類提供了以下方法: 6 # run(): 用以表示線程活動的方法。 7 # start():啟動線程活動。 8 # join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者拋出未處理的異常-或者是可選的超時發生。 9 # isAlive(): 返回線程是否活動的。 10 # getName(): 返回線程名。 11 # setName(): 設置線程名。Method
3.3 同步鎖(Lock)
r = threading.Lock() r.acquire() --> 加鎖 r.release() --> 解鎖
1 import time 2 import threading 3 4 def addNum(): 5 global num #在每個線程中都獲取這個全局變量 6 # num-=1 7 lock.acquire() 8 temp=num 9 print(‘--get num:‘,num ) 10 #time.sleep(0.1) 11 num =temp-1 #對此公共變量進行-1操作 12 lock.release() 13 14 num = 100 #設定一個共享變量 15 thread_list = [] 16 lock=threading.Lock() 17 18 for i in range(100): 19 t = threading.Thread(target=addNum) 20 t.start() 21 thread_list.append(t) 22 23 for t in thread_list: #等待所有線程執行完畢 24 t.join() 25 26 print(‘final num:‘, num )View Code
3.4 線程死鎖和遞歸鎖
在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下將一直等待下去。下面是一個死鎖的例子:
1 import threading,time 2 3 class myThread(threading.Thread): 4 def doA(self): 5 lockA.acquire() 6 print(self.name,"gotlockA",time.ctime()) 7 time.sleep(3) 8 lockB.acquire() 9 print(self.name,"gotlockB",time.ctime()) 10 lockB.release() 11 lockA.release() 12 13 def doB(self): 14 lockB.acquire() 15 print(self.name,"gotlockB",time.ctime()) 16 time.sleep(2) 17 lockA.acquire() 18 print(self.name,"gotlockA",time.ctime()) 19 lockA.release() 20 lockB.release() 21 def run(self): 22 self.doA() 23 self.doB() 24 if __name__=="__main__": 25 26 lockA=threading.Lock() 27 lockB=threading.Lock() 28 threads=[] 29 for i in range(5): 30 threads.append(myThread()) 31 for t in threads: 32 t.start() 33 for t in threads: 34 t.join()deadLock
解決辦法:使用遞歸鎖
即重新定義一把鎖:lock = threading.RLock() --> 遞歸鎖
將所有的鎖替換為遞歸鎖即可。遞歸鎖可以重復加鎖。
RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
3.5 信號量(Semaphore)--> 相當於一把鎖
信號量用來控制線程並發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器為 0時,acquire()將阻塞線程至同步鎖定狀態,直到其他線程調用release()。(類似於停車位的概念)
BoundedSemaphore與Semaphore的唯一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了將拋出一個異常。
1 import threading,time 2 class myThread(threading.Thread): 3 def run(self): 4 if semaphore.acquire(): 5 print(self.name) 6 time.sleep(5) 7 semaphore.release() 8 if __name__=="__main__": 9 semaphore=threading.Semaphore(5) 10 thrs=[] 11 for i in range(100): 12 thrs.append(myThread()) 13 for t in thrs: 14 t.start()Semaphore
3.6 條件變量同步(Condition)--> 鎖
有一類線程需要滿足條件之後才能夠繼續執行,Python提供了threading.Condition 對象用於條件變量線程的支持,它除了能提供RLock()或Lock()的方法外,還提供了 wait()、notify()、notifyAll()方法。
lock_con=threading.Condition([Lock/Rlock]): 鎖是可選選項,不傳入鎖,對象自動創建一個RLock()。
1 wait():條件不滿足時調用,線程會釋放鎖並進入等待阻塞; 2 notify():條件創造後調用,通知等待池激活一個線程; 3 notifyAll():條件創造後調用,通知等待池激活所有線程。
1 import threading,time 2 from random import randint 3 class Producer(threading.Thread): 4 def run(self): 5 global L 6 while True: 7 val=randint(0,100) 8 print(‘生產者‘,self.name,":Append"+str(val),L) 9 if lock_con.acquire(): 10 L.append(val) 11 lock_con.notify() 12 lock_con.release() 13 time.sleep(3) 14 class Consumer(threading.Thread): 15 def run(self): 16 global L 17 while True: 18 lock_con.acquire() 19 if len(L)==0: 20 lock_con.wait() 21 print(‘消費者‘,self.name,":Delete"+str(L[0]),L) 22 del L[0] 23 lock_con.release() 24 time.sleep(0.25) 25 26 if __name__=="__main__": 27 28 L=[] 29 lock_con=threading.Condition() 30 threads=[] 31 for i in range(5): 32 threads.append(Producer()) 33 threads.append(Consumer()) 34 for t in threads: 35 t.start() 36 for t in threads: 37 t.join()Condition Demo
3.7 同步條件(Event)
條件同步和條件變量同步差不多意思,只是少了鎖功能,因為條件同步設計於不訪問共享資源的條件環境。event=threading.Event():條件環境對象,初始值 為False;
1 event.isSet():返回event的狀態值; 2 3 event.wait():如果 event.isSet()==False將阻塞線程; 4 5 event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; 6 7 event.clear():恢復event的狀態值為False。
示例:
1 import threading,time 2 class Boss(threading.Thread): 3 def run(self): 4 print("BOSS:今晚大家都要加班到22:00。") 5 event.isSet() or event.set() 6 time.sleep(5) 7 print("BOSS:<22:00>可以下班了。") 8 event.isSet() or event.set() 9 class Worker(threading.Thread): 10 def run(self): 11 event.wait() 12 print("Worker:哎……命苦啊!") 13 time.sleep(0.25) 14 event.clear() 15 event.wait() 16 print("Worker:OhYeah!") 17 if __name__=="__main__": 18 event=threading.Event() 19 threads=[] 20 for i in range(5): 21 threads.append(Worker()) 22 threads.append(Boss()) 23 for t in threads: 24 t.start() 25 for t in threads: 26 t.join()View Code
3.8 隊列(queue)-->多線程利器
queue中的方法:
創建一個“隊列”對象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為 1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。 Python Queue模塊有三種隊列及構造函數: 1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 2、LIFO類似於堆,即先進後出。 class queue.LifoQueue(maxsize) 3、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) 此包中的常用方法(q = Queue.Queue()): q.qsize() 返回隊列的大小 q.empty() 如果隊列為空,返回True,反之False q.full() 如果隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 相當q.get(False) 非阻塞 q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味著等到隊列為空,再執行別的操作
示例:
1 import threading,queue 2 from time import sleep 3 from random import randint 4 class Production(threading.Thread): 5 def run(self): 6 while True: 7 r=randint(0,100) 8 q.put(r) 9 print("生產出來%s號包子"%r) 10 sleep(1) 11 class Proces(threading.Thread): 12 def run(self): 13 while True: 14 re=q.get() 15 print("吃掉%s號包子"%re) 16 if __name__=="__main__": 17 q=queue.Queue(10) 18 threads=[Production(),Production(),Production(),Proces()] 19 for t in threads: 20 t.start()Demo
多線程(threading module)