1. 程式人生 > >多工處理方式之二:多執行緒

多工處理方式之二:多執行緒

### 執行緒的理解 - 1、作業系統能夠進行**運算排程**的最小單位,即**程式執行的最小單位** - 2、程序負責程式所必須的資源分配**(文字區域、資料區域、堆疊區域)**,**一個程序**中也經常需要同時做多件事,即要**同時執行多個‘子任務’,這些子任務即執行緒**。 > 執行緒是每一個程序中的**單一順序控制流** ,其包含在程序中,是程序的實際運作單位**(程序是執行緒的容器)** > > ⼀個程式中⾄少要有⼀個程序,⼀個程序中⾄少要有⼀個執行緒 > > 執行緒不能夠獨⽴執⾏,必須依存在程序中 - 3、執行緒基本不佔用系統資源,其只擁有在執行過程中必不可少的資源(如程式計數器、一組暫存器和棧) - 4、**同一個程序中的所有執行緒都共享此程序所擁有的全部資源**, > 一個程序中的執行緒共享相同的記憶體單元/記憶體地址空間——>可以訪問相同的變數和物件,而且它們從同一堆中分配物件——>通訊、資料交換、同步操作 - 5、執行緒之間的**通訊主要通過共享所屬程序的資源** > 執行緒間的通訊是在同一地址空間上進行的,所以不需要額外的通訊機制,這就使得通訊更簡便而且資訊傳遞的速度也更快 - 6、執行緒的**上下文切換很快,資源開銷較少**,但是相對於程序而言,**不夠安全,在多個執行緒共同操作程序的某一資源時,可能會丟失資料** - 7、**執行緒和程序之間的區別** ![](https://img2020.cnblogs.com/blog/2001508/202008/2001508-20200803020537482-778635419.png) **** ### 執行緒的五種狀態 >1. **新狀態**:執行緒物件已經建立,還未呼叫 `start()` 方法。 > 2. **可執行狀態**:當執行緒有資格執行,但排程程式還沒有把它選定為執行執行緒時執行緒所處的狀態。當 `start()`方法呼叫時,執行緒首先進入可執行狀態。線上程執行之後或者從阻塞、等待或睡眠狀態回來後,也返回到可執行狀態。 > 3. **執行狀態**:執行緒排程程式從可執行池中選擇一個執行緒作為當前執行緒時執行緒所處的狀態。這也是執行緒進入執行狀態的唯一方式。 > 4. **等待/阻塞/睡眠狀態**:這是執行緒有資格執行時它所處的狀態。實際上這個三狀態組合為一種,其共同點是:執行緒仍舊是活的(可執行的),但是當前沒有條件執行。但是如果某件事件出現,他可能返回到可執行狀態。 > 5. **死亡態**:當執行緒的`run()`方法完成時就認為它死去。這個執行緒物件也許是活的,但是,它已經不是一個單獨執行的執行緒。執行緒一旦死亡,就不能復生。如果在一個死去的執行緒上呼叫 `start()`方法,會丟擲 `java.lang.IllegalThreadStateException` 異常。 **** ### GIL全域性直譯器鎖 > **Python中的多執行緒可以併發,但不能並行**(同一個程序下的多個執行緒不能被多個cpu同時執行),緣由就是GIL全域性直譯器鎖,導致**同一時間內只有一個執行緒在執行** > python 檔案的執行流程為: > > - 作業系統先將python直譯器和需要執行的py檔案由硬碟載入到記憶體中,開闢一個程序空間 > - 此程序即使得python直譯器首先將py檔案中的程式碼指令通過編譯器編譯成位元組碼 > - 編譯完成的c的位元組碼通過虛擬機器轉換為機器碼由cpu執行 > > 這個執行流程即是py檔案執行程序中的主執行緒 > 若Python中的多執行緒並行,則每個執行緒都要執行上述過程,從而**同一時間需要多個CPU同時執行轉換而來的機器碼**,極大限度的提高執行效率。但眾所周知,Python是由荷蘭人吉多·範羅蘇姆 (Guido van Rossum)於**1989年**聖誕節期間開發的一個新的指令碼解釋程式,而雙核cpu是2005年才被普遍應用的,即在當時的條件下,Cpython執行多執行緒時應用不了多核。故為了**避免多個執行緒併發執行**而造成**資料的不完整以及執行緒的不安全**,龜叔在python的直譯器中加上了互斥鎖——全域性直譯器鎖(GIL鎖),即使得**Cpython在所有執行緒進入直譯器之前加了一個全域性直譯器鎖**,當執行完當前py檔案後才釋放該鎖,這便導致了**python中同一時間內只有一個執行緒在執行** > > > > **注:**若想使得多執行緒並行,**可以用多程序間接實現執行緒的並行**,或者**更換直譯器為Pypy、Ppython**。 **** ### 執行緒建立 #### 使用python中的threading模組中的Thread類建立執行緒 `from threading import Thread` ##### threading模組提供的Thread類來建立執行緒物件 ```python from threading import Thread import os def func(num): print('當前執行緒{},所歸屬的程序id號{}'.format(os.getpid(), num)) for i in range(10): # 非同步建立10個子執行緒 t = Thread(target=func, args=(i,)) t.start() # 主執行緒執行任務 print(os.getpid()) ``` ##### 自定義類繼承Thread類,每次例項化這個類的時候,就等同於例項化執行緒物件 > 這種方法付只需要重寫 `threading.Thread` 類的 `run` 方法,然後呼叫 `start()` 開啟執行緒就可以了 ```python from threading import Thread import time class MyThread(Thread): def __init__(self, name): # 手動呼叫父類的構造方法 super().__init__() self.name = name def run(self): time.sleep(1) print("當前執行緒正在執行runing ... ", self.name) if __name__ == "__main__": t = MyThread("機器今天會再次爆炸麼?") t.start() print("主執行緒執行結束 ... ") ``` #### Thread 類中的基本方法 >- t.is_alive() 檢測執行緒是否仍然存在 > >- t.setName() 設定執行緒名字 > >- t.getName() 獲取執行緒名字 ```python from threading import Thread import time def func(): time.sleep(1) if __name__ == "__main__": t = Thread(target=func) t.start() print(t , type(t)) print(t.is_alive()) # False print(t.getName()) t.setName("xboyww") print(t.getName()) ``` > - currentThread().ident 檢視執行緒id號 > - enumerate() 返回目前正在執行的執行緒列表 > - activeCount() 返回目前正在執行的執行緒數量 ```python from threading import Thread import time from threading import currentThread from threading import enumerate from threading import activeCount # 1.currentThread().ident 檢視執行緒id號 def func(): print("子執行緒id", currentThread().ident, os.getpid()) if __name__ == "__main__": Thread(target=func).start() print("主執行緒id", currentThread().ident, os.getpid()) # 2.enumerate() 返回目前正在執行的執行緒列表 def func(): print("子執行緒id", currentThread().ident, os.getpid()) time.sleep(0.5) if __name__ == "__main__": for i in range(10): Thread(target=func).start() lst = enumerate() # 子執行緒10 + 主執行緒1個 = 11 print(lst ,len(lst)) # 3.activeCount() 返回目前正在執行的執行緒數量 print(activeCount()) ``` **** ### 執行緒池(ThreadPoolExecutor) > **預設如果一個執行緒短時間內可以完成更多的任務,就不會建立額外的新的執行緒,以節省資源** ```python from concurrent.futures import ThreadPoolExecutor from threading import current_thread as cthread def func(i): print("thread ... start", cthread().ident, i) time.sleep(3) print("thread ... end", i) return cthread().ident if __name__ == "__main__": lst = [] setvar = set() # (1) 建立執行緒池物件 """限制執行緒池最多建立os.cpu_count() * 5 = 執行緒數,所有任務全由這幾個執行緒完成,不會額外建立執行緒""" tp = ThreadPoolExecutor() # 我的電腦40個執行緒併發 # (2) 非同步提交任務 for i in range(100): res = tp.submit(func, i) lst.append(res) # (3) 獲取返回值 for i in lst: setvar.add(i.result()) # (4) 等待所有子執行緒執行結束 tp.shutdown() print(len(setvar), setvar) print("主執行緒執行結束 ... ") ``` **** ### 守護執行緒 **守護執行緒 : 等待所有執行緒全部執行完畢之後,再自己終止,守護的是所有執行緒** `執行緒物件.setDaemon(True)` ```python from threading import Thread import time def func1(): while True: time.sleep(0.5) print("我是func1") def func2(): print("我是func2 start ... ") time.sleep(3) print("我是func2 end ... ") t1 = Thread(target=func1) t2 = Thread(target=func2) # 在start呼叫之前,設定守護執行緒 t1.setDaemon(True) t1.start() t2.start() print("主執行緒執行結束 ... ") ``` **** ### 同步 & 非同步 #### 同步 > 同步意味著**順序、統一的時間軸** - 場景1:是指完成事務的邏輯,先執行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,協同步調,按預定的先後次序進行執行 - 場景2:一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列 #### 非同步 > 非同步則意味著**亂序、效率優先的時間軸** - 處理呼叫這個事務之後,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態、回撥來通知呼叫者處理結果 - 對於I/O相關的程式來說,非同步程式設計可以大幅度的提高系統的吞吐量,因為在某個I/O操作的讀寫過程中,系統可以先去處理其它的操作(通常是其它的I/O操作) - 不確定執行順序 **** ### 阻塞 & 非阻塞 #### 阻塞 > 程式中有了IO操作,就會發生阻塞,必須要輸入/輸出一個字串,否則程式碼不往下執行 #### 非阻塞 > 程式中沒有任何耗時操作,無需等待正常往下執行 > 同步阻塞 :效率低,cpu利用不充分 > 非同步阻塞 :比如socketserver,可以同時連線多個,但是彼此都有recv > 同步非阻塞:沒有類似input的程式碼,從上到下執行.預設的正常情況程式碼 > 非同步非阻塞:效率是最高的,cpu過度充分,過度發熱, 需液冷 **** ### 序列 & 並行 & 併發 > 假設有A、B兩個任務,則序列、並行、併發的區別如圖所示。 #### 序列 A和B兩個任務執行在一個CPU執行緒上,在A任務執行完之前不可以執行B。即,在整個程式的執行過程中,僅存在一個執行上下文,即一個呼叫棧一個堆。**程式會按順序執行每個指令**。 #### 並行 並行指**兩個或兩個以上任務同一時刻被不同的cpu執行**。在多道程式環境下,並行性使多個程式同一時刻可在不同CPU上同時執行。比如,A和B兩個任務可以同時執行在不同的CPU執行緒上,效率較高,但受限於CPU執行緒數,如果任務數量超過了CPU執行緒數,那麼每個執行緒上的任務仍然是順序執行的。 #### 併發 併發指**多個執行緒在巨集觀(相對於較長的時間區間而言)上表現為同時執行,而實際上是輪流穿插著執行,併發的實質是一個物理CPU在若干道程式之間多路複用**,其目的是提高有限物理資源的執行效率。 併發與並行序列並不是互斥的概念,如果是在一個CPU執行緒上啟用併發,那麼自然就還是序列的,而如果在多個執行緒上啟用併發,那麼程式的執行就可以是既併發 #### 圖示 ![](https://img2020.cnblogs.com/blog/2001508/202008/2001508-20200803020003007-138361090.png) **** ### 執行緒同步 > 由於一個程序中的**多個執行緒享程序中的資源**,所以可能造成**多個執行緒同時修改一個變數的情況(即執行緒⾮安全),可能造成資料混亂**,故需要進⾏**同步控制**,即**執行緒同步**。 > 可以通過**延時**確定多執行緒的執行順序,但不推薦。 > > ```python > import threading > import time > > > def work1(nums): > nums.append(44) > print('-----in work1-----', nums) > > > def work2(nums): > time.sleep(1) > # 延時一會保證另一執行緒執行 > print('-----in work2-----', nums) > > > g_nums = [11, 22, 33] > t1 = threading.Thread(target=work1, args=(g_nums,)) > t1.start() > t2 = threading.Thread(target=work2, args=(g_nums,)) > t2.start() > > ``` #### 互斥鎖(threading模組中定義的Lock類) > 互斥鎖保證了每次只有⼀個執行緒操作共享資料,從⽽保證了多執行緒情況下資料的安全性(原子性),可以實現**執行緒同步**。 > 互斥鎖為資源引入一個狀態:**鎖定/非鎖定**。某個執行緒要**更改共享資料時**,先將其鎖定,此時**資源的狀態為“鎖定”狀態**,其他執行緒不能更改;直到**該執行緒釋放資源**,將資源的**狀態變成“非鎖定”**,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒操作共享資料,從而保證了多執行緒情況下資料的安全性。 > **儘量使用一把鎖解決問題,不要互相巢狀,否則容易死鎖** ```python import threading num = 0 def test1(): global num # 呼叫Lock物件的acquire()方法獲得鎖時,這把鎖進入“locked”狀態 # 如果此時另一個執行緒2試圖獲得這個鎖,該執行緒2就會變為同步阻塞狀態 if mutex.acquire(): for i in range(1000): num += 1 # 呼叫Lock物件的release()方法釋放鎖之後,該鎖進入“unlocked”狀態。 mutex.release() def test2(): global num # 執行緒排程程式繼續從處於同步阻塞狀態的執行緒中選擇一個來獲得鎖,並使得該執行緒進入執行(running)狀態 if mutex.acquire(): for i in range(1000): num += 1 mutex.release() mutex = threading.Lock() p1 = threading.Thread(target=test1) p1.start() p2 = threading.Thread(target=test2) p2.start() print(num) ``` ##### 死鎖(只上鎖,不解鎖) > 在多個執行緒間共享多個資源的時候, 如果**兩個執行緒分別佔有⼀部分資源並且同時等待對⽅的資源**, 就會造成死鎖 > 在多執行緒程式中,死鎖問題很大一部分是由於**執行緒同時獲取多個鎖造成的**。如一個執行緒獲取了第一個鎖,然後在獲取第二個鎖的時候發生阻塞,那麼這個執行緒就可能阻塞其他執行緒的執行,從而導致整個程式假死 ```python import threading import time class MyThread1(threading.Thread): def run(self): # 執行緒1被 A 鎖——>鎖定 if mutexA.acquire(): print(self.name + '---do1---up---') time.sleep(1) if mutexB.acquire(): print(self.name + '---do1---down---') mutexB.release() # 執行緒1被 A 鎖釋放的前提是:執行緒1 搶到 B 鎖 mutexA.release() class MyThread2(threading.Thread): def run(self): time.sleep(1) # 執行緒2被 B 鎖——>鎖定 if mutexB.acquire(): print(self.name + '---do2---up---') if mutexA.acquire(): print(self.name + '---do2---down---') mutexA.release() # 執行緒2被 B 鎖釋放的前提是:執行緒2 搶到 A 鎖 mutexB.release() if __name__ == '__main__': mutexA = threading.Lock() mutexB = threading.Lock() t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() # Thread-1---do1---up--- # Thread-2---do2---up--- # 程式卡死 # 執行緒1不釋放A鎖 # 執行緒2不釋放B鎖 ``` ##### 遞迴鎖(threading模組中定義的RLock類) > 用於快速解決專案**因死鎖問題不能正常執行**的場景,用來處理異常死鎖的 ```python import threading import time class MyThread1(threading.Thread): def run(self): if mutexA.acquire(): print(self.name + '---do1---up---') time.sleep(1) if mutexB.acquire(): print(self.name + '---do1---down---') mutexB.release() mutexA.release() class MyThread2(threading.Thread): def run(self): time.sleep(1) if mutexB.acquire(): print(self.name + '---do2---up---') if mutexA.acquire(): print(self.name + '---do2---down---') mutexA.release() mutexB.release() if __name__ == '__main__': mutexA = threading.RLock() mutexB = threading.RLock() t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() # Thread-1---do1---up--- # Thread-1---do1---down--- # Thread-2---do2---up--- # Thread-2---do2---down--- ``` #### 訊號量(threading模組中定義的Semaphore類) > 訊號量 `semaphore`:用於控制**同一時間內可以操作程序資源的執行緒數量**的一把鎖,簡言之訊號量是用來**控制執行緒併發數的一把鎖**,也可以實現**執行緒同步**。 > > 使用場景:在讀寫檔案的時候,一般只有一個執行緒在寫,而讀可以有多個執行緒同時進行,如果需要限制同時讀檔案的執行緒個數,這時候就可以用到訊號量了(如果用互斥鎖,就是限制同一時刻只能有一個執行緒讀取檔案) ```python import time import threading def foo(se): se.acquire() time.sleep(2) print("ok") se.release() if __name__ == "__main__": # 設定同一時間內可以有5個執行緒併發 se = threading.Semaphore(5) for i in range(20): t1 = threading.Thread(target=foo, args=(se,)) t1.start() # 此時可以控制同時進入的執行緒數 ``` #### 執行緒佇列(queue模組) > **通過3種類型的佇列來實現執行緒同步**,都實現了鎖原語(可以理解為原⼦操作, 即要麼不做, 要麼就做完) , 能夠在多執行緒中直接使⽤ ##### queue.Queue:FIFO(先⼊先出) 佇列 Queue ```python # 基本使用 from queue import Queue # put 存 # get 取 # put_nowait 存,超出了佇列長度,報錯 # get_nowait 取,沒資料取不出來,報錯 # linux windows 執行緒中put_nowait,get_nowait都支援 """先進先出,後進後出""" # maxsize為一個整數,表示佇列的最大條目數,可用來限制記憶體的使用。 # 一旦佇列滿,插入將被阻塞直到佇列中存在空閒空間。如果maxsize小於等於0,佇列大小為無限。maxsize預設為0 q = Queue(maxsize=0) q.put(1) q.put(2) print(q.get()) print(q.get()) # 取不出來,阻塞 # print(q.get()) print(q.get_nowait()) q2 = Queue(3) q2.put(11) q2.put(22) q2.put(33) # 放不進去了,阻塞 # q2.put(44) q2.put_nowait(44) ``` ```python import threading import time from queue import Queue class Pro(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count + 1 msg = '生成產品' + str(count) queue.put(msg) # 佇列中新增新產品 print(msg) time.sleep(1) class Con(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消費了' + queue.get() print(msg) time.sleep(1) if __name__ == "__main__": queue = Queue() # 建立一個佇列。執行緒中能用,程序中不能使用 for i in range(500): # 建立500個產品放到佇列裡 queue.put('初始產品' + str(i)) # 字串放進佇列 for i in range(2): # 建立了兩個執行緒 p = Pro() p.start() for i in range(5): # 5個執行緒 c = Con() c.start() ``` ##### queue.LifoQueue:LIFO(後⼊先出) 棧 LifoQueue ```python # LifoQueue 先進後出,後進先出(按照棧的特點設計) from queue import LifoQueue lq = LifoQueue(3) lq.put(11) lq.put(22) lq.put(33) # print(lq.put_nowait(444)) print(lq.get()) print(lq.get()) print(lq.get()) ``` ##### queue.PriorityQueue:(優先順序佇列) PriorityQueue ```python # PriorityQueue 按照優先順序順序排序 (預設從小到大排序) from queue import PriorityQueue # 如果都是數字,預設從小到大排序 pq = PriorityQueue() pq.put(13) pq.put(3) pq.put(20) print(pq.get()) print(pq.get()) print(pq.get()) # 如果都是字串 """如果是字串,按照ascii編碼排序""" pq1 = PriorityQueue() pq1.put("chinese") pq1.put("america") pq1.put("latinos") pq1.put("blackman") print(pq1.get()) print(pq1.get()) print(pq1.get()) print(pq1.get()) # 要麼全是數字,要麼全是字串,不能混合 error """ pq2 = PriorityQueue() pq2.put(13) pq2.put("aaa") pq2.put("擬稿") """ pq3 = PriorityQueue() # 預設按照元組中的第一個元素排序 pq3.put( (20,"wangwen") ) pq3.put( (18,"wangzhen") ) pq3.put( (30,"weiyilin") ) pq3.put( (40,"xiechen") ) print(pq3.get()) print(pq3.get()) print(pq3.get()) print(pq3.get()) ``` #### 生產消費者模式 - 程序(執行緒)之間如果直接通訊,可能會出現兩個問題 - 耦合性太強 - 速率有可能不匹配 解決方式,找一個緩衝區來中轉資料即**生產者——消費者模式** **** ### 執行緒非同步 #### 通過回撥函式可以實現多執行緒非同步執行 > 回撥函式: > 把函式當成引數傳遞給另外一個函式 > 在當前函式執行完畢之後,最後呼叫一下該引數(函式),這個函式就是回撥函式 > > 功能: > 列印狀態: a屬性 > 支付狀態: b屬性 > 退款狀態: c屬性 > 轉賬的狀態: d屬性 > 把想要的相關成員或者相關邏輯寫在自定義的函式中 > 支付寶介面在正常執行之後,會呼叫自定義的函式,來執行相應的邏輯 > 那麼這個函式就是回撥函式 ```python from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import current_thread as cthread import os, time def func1(i): print("Process start ... ", os.getpid()) time.sleep(0.5) print("Process end ... ", i) return "*" * i def func2(i): print("thread start ... ", cthread().ident) time.sleep(0.5) print("thread end ... ", i) return "*" * i def call_back1(obj): print("<==回撥函式callback程序號:===>", os.getpid()) print(obj.result()) def call_back2(obj): print("<==回撥函式callback執行緒號:===>", cthread().ident) print(obj.result()) # (1) 程序池的回撥函式: 由主程序執行呼叫完成 if __name__ == "__main__": p = ProcessPoolExecutor(5) for i in range(1, 11): res = p.submit(func1, i) # 程序物件.add_done_callback(回撥函式) ''' add_done_callback 可以把res本物件和回撥函式自動傳遞到函式裡來 ''' res.add_done_callback(call_back1) p.shutdown() print("主程序執行結束 ... ", os.getpid()) # (2) 執行緒池的回撥函式: 由當前子執行緒執行呼叫完成 if __name__ == "__main__": tp = ThreadPoolExecutor(5) for i in range(1, 11): res = tp.submit(func2, i) # 執行緒物件.add_done_callback(回撥函式) ''' add_done_callback 可以把res本物件和回撥函式自動傳遞到函式裡來 ''' res.add_done_callback(call_back2) tp.shutdown() print("主執行緒執行結束 ... ", cthread().ident) ``` ```python from multiprocessing import Pool import random import time def download(f): for i in range(1, 4): print(f"{f}下載檔案{i}") time.sleep(random.randint(1, 3)) return "下載完成" def alterUser(msg): print(msg) if __name__ == "__main__": p = Pool(3) # 當func執行完畢後,return的東西會給到回撥函式callback p.apply_async(func=download, args=("執行緒1",), callback=alterUser) p.apply_async(func=download, args=("執行緒2",), callback=alterUser) p.apply_async(func=download, args=("執行緒3",), callback=alterUser) p.close() p.join() ```