第37天併發程式設計之執行緒篇
一. 執行緒初識
什麼是執行緒和程序
程序指的是一個程式執行的過程,是一個資源單位。它包含了作業系統開闢記憶體空間,將應用程式載入到記憶體中以及執行應用程式程式碼的整個過程。就像是一個車間內的一個小工廠一樣,整個的生產過程被稱之為一個程序。
執行緒是作業系統真正的執行單元。它僅僅代表的是程序中的最後一步,也就是執行應用程式程式碼的過程。就像是車間裡的一條條流水線一樣,是真正的用來執行程式碼的一個過程。
執行緒和程序的區別
- 程序佔用開銷較大,這是因為程序要重新開闢一段記憶體空間,執行緒開銷較小。
程序之間記憶體是物理隔離的,但是同一程序內的執行緒之間記憶體是共享的。注意不同程序之間的執行緒通訊還是需要通過佇列進行通訊的。 from threading import Thread import time
# 在主執行緒中設定一個全域性變數x x = 100 def task(): global x x = 0 t = Thread(target=task) t.start() t.join() # 此時打印出來的結果是0,說明同一個程序之間執行緒資料是共享的 print(x)
開啟執行緒的兩種方式
通過類Thread
因為執行緒開銷較小的原因,我們會發現結果是先列印task裡面的內容,然後列印主執行緒三個字,這個和程序正好相反。
from threading import Thread import time
def task(): print('thread is running ....') time.sleep(3)
Thread(target=task).start() print('主執行緒')
通過繼承類Thread
我們建立一個執行緒並且執行的過程本質上就是建立記憶體之後執行Thread類的run函式,因此我們可以通過繼承類的方式去建立。
from threading import Thread import time
class MyThread(Thread): def run(self): print('thread is running ....') time.sleep(3)
MyThread().start() print('主執行緒')
執行緒相關的屬性方法
# current_thread() 當前執行緒物件,.name是裡面的一個屬性,可以得到執行緒名稱,主執行緒名稱為MainTread # active_count() 當前活躍的執行緒數,記得還有一個主執行緒 # enumerate 返回一個列表,裡面都是當前活躍的執行緒數目 from threading import Thread, current_thread, active_count, enumerate import time
def task(): print('%s is running' % current_thread().name) time.sleep(3)
#: name表示自定義執行緒名稱 t = Thread(target=task, name='第一個執行緒') t.start() print(current_thread().name) print(active_count()) print(enumerate())
守護程序與守護執行緒
守護程序 對於程序而言,如果程式碼中有守護程序,也有非守護程序,等主程序程式碼執行完畢之後守護程序也就結束了,並不會等待非守護程序的執行。 from multiprocessing import Process
def task1(): print(123) time.sleep(1) print('end123') def task2(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) p1.daemon = True p1.start() p2.start() # 當執行完print之後就代表主程序程式碼已經執行完畢,此時就會終止守護程序 # 所以不會列印task1裡面的內容 # 但是還是要等待非守護程序結束之後主程序才會真正的結束 # 因此我們看到了task2裡面的內容 print('主程序over...') # 執行結果; # 主程序over... # 456 # end456
守護執行緒 對於執行緒而言,如果程式碼中有守護執行緒,也有非守護執行緒,等主執行緒程式碼執行完畢之後並不會終止守護執行緒的執行,只有等到所有的非守護執行緒執行完畢之後才意味著主執行緒的結束,此時才會總之守護執行緒。 將上面的程式碼的程序改成執行緒,就會出現不一樣的效果。 from threading import Thread import time
二.鎖def task1(): print(123) time.sleep(5) print('end123') def task2(): print(456) time.sleep(3) print('end456') t1 = Thread(target=task1) t2 = Thread(target=task2) t1.daemon = True t1.start() t2.start() # 當print程式碼執行完畢之後,就代表這主執行緒程式碼執行完畢了,但是並不是主執行緒執行完畢了,這個是和程序的一個區別 # 因此要等待非守護執行緒t2執行完畢之後才代表主執行緒真的結束了,此時task1作為守護程序也就被終止了 # 因此我們會看到能夠列印全部的task2內容,但是不會列印task1的內容 print('主執行緒..') # 執行結果 # 123 # 456 # 主執行緒.. # end456
互斥鎖 執行緒中的互斥鎖和程序中的互斥鎖都是為了解決多個程序或者執行緒同時訪問同一個系統資源時出現數據混亂的問題,不同之處在於所使用模組不一樣,因此執行緒互斥鎖只能在執行緒中使用,程序互斥鎖只能在程序中使用。 不使用執行緒鎖的問題 from threading import Thread import time
x = 100
def task(): global x temp = x time.sleep(0.1) x = temp - 1
#: 建立100個執行緒全域性變數x進行修改,每次減1 t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start()
# 用來等待所有執行緒結束 for i in t_l: i.join()
#: 當我們列印資料的時候發現結果是99,為什麼呢? #: 因為每個執行緒temp = x然後等待i/o,當cpu再次切換過來執行的時候temp=100,x的值就是99 print(x) 因此在一些資料的互動過程中我們需要加上執行緒鎖來保證資料的安全性 from threading import Thread, Lock import time
x = 100 mutex = Lock() # 建立鎖
def task(): global x mutex.acquire() # 在資料修改之前新增鎖 temp = x time.sleep(0.1) x = temp - 1 mutex.release() # 在資料修改之後釋放鎖
#: 建立100個執行緒全域性變數x進行修改,每次減1 t_l = [] for i in range(100): t = Thread(target=task) t_l.append(t) t.start()
# 用來等待所有執行緒結束 for i in t_l: i.join()
print(x)死鎖 死鎖的現象就是兩個執行緒(或者兩個程序)A和B,還有兩個互斥鎖C和D。A和B都需要C和D這兩把鎖,但是A搶到了C卻沒有搶到B,而B搶到了D卻沒有搶到C,從而導致程式進入死鎖狀態。 from threading import Thread, Lock, current_thread
mutex1 = Lock() mutex2 = Lock()
def task1(): mutex1.acquire() print('%s 搶到了鎖1' % current_thread().name) # 當搶到鎖1之後,cpu就會執行其他的程式 # 當再回來的時候卻發現鎖2被task2搶到了因此在等待task2釋放鎖2 # 但是此時task2和task1是一樣的,在等待task1釋放鎖1,此時就進入了死鎖的狀態 time.sleep(0.1) mutex2.acquire() print('%s搶到了鎖2' % current_thread().name)
mutex2.release() print('%s釋放了鎖2' % current_thread().name) mutex1.release() print('%s釋放了鎖1' % current_thread().name)
def task2(): mutex2.acquire() print('%s 搶到了鎖1' % current_thread().name) time.sleep(0.1) mutex1.acquire() print('%s搶到了鎖2' % current_thread().name)
mutex1.release() print('%s釋放了鎖2' % current_thread().name) mutex2.release() print('%s釋放了鎖1' % current_thread().name)
t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() 3.遞迴鎖 遞迴鎖所使用的是RLock函式,其原理是如果我自己需要多把鎖的時候,我就把這多把鎖設定成一個遞迴鎖,搶到一次遞迴鎖計數就加1,當其他的執行緒或者程序想使用這一把鎖的時候,會首先去檢視鎖計數是否為0,如果不為零,就等待其他的程序或者執行緒來釋放鎖,這就可以解決死鎖問題了。 # mutex1 = Lock() # mutex2 = Lock() # 只需要將上面的鎖修改成遞迴鎖就可以解決上面的死鎖問題了 mutex1 = mutex2 = RLock() 4.訊號量 訊號量使用的是Semaphore類,我們可以給他傳遞一個值來代表一次可以同時執行幾個執行緒或者是幾個程序,類似於一種池的概念,在某種意義上它是不能夠保證資料的安全性。只是說在某些沒有資料互動的場景下我們可以控制其每次程序或者執行緒的數量。 from threading import Thread, Semaphore, current_thread import time import random
s = Semaphore(5) x = 30 def go_wc(): global x s.acquire() temp = x time.sleep(random.randint(1, 3)) x = temp - 1 s.release()
t_l = [] for i in range(30): t = Thread(target=go_wc) t_l.append(t) t.start()
for i in t_l: i.join()
print(x) 5.GIL全域性直譯器鎖python程式執行的三個步驟 (1). 開闢一塊記憶體空間,啟動Python直譯器 (2). 載入python程式到記憶體中 (3). 將記憶體中的程式傳遞給python直譯器一步一步執行
- 什麼叫做全域性直譯器鎖 # In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple # native threads from executing Python bytecodes at once. # GIL全域性直譯器鎖是一種互斥鎖,在同一時刻,保證多個執行緒中只能有一個執行緒使用python直譯器
多執行緒是序列還是併發還是並行 多執行緒其實也是併發的,序列指的是task1完全執行完畢之後才去執行task2,如下的程式碼解析。這也是CPython的一大詬病之一,雖然說在I/O密集型的操作中並不會太多的去影響效能,但是相對於多核多執行緒併發的來說,很顯然CPython做的還不是很好。(以目前我的水品來看,覺得如果一個執行緒內可以有多個python直譯器就好了.......哈哈哈) from threading import Thread, current_thread
def task1(): # 1. 列印當前資訊,遇到sleep之後,cpu會執行其他的操作,此時釋放GIL鎖 # 3. 當釋放了GIL鎖之後,task2會立馬搶到GIL鎖,然後cpu執行 print('is running ....', current_thread().name) time.sleep(1) # 6. 列印end資訊之後,子執行緒執行完畢,釋放GIL鎖 print('ending1....') def task2(): # 4. 搶到GIL鎖之後,列印當前資訊,遇到sleep之後,cpu去執行其他的操作,此時會釋放GIL鎖 # 5. 此時task1睡眠完成之後,會立馬去搶GIL鎖,然後列印end資訊 print('is running ....', current_thread().name) time.sleep(1) print('ending2....') t1 = Thread(target=task1) t2 = Thread(target=task2) # 此時會去執行task1函式 # 1. 首先我要去搶GIL鎖,當搶到鎖之後進入task1函式 t1.start() t2.start()
什麼時候開多執行緒和多程序 需要有四個任務去處理 方案一:開啟四個程序 方案二:開啟四個執行緒
單核: 無論是哪種程式都應該使用方案二,因為單核情況下無論是程序還是執行緒都需要不停的切換執行,但是線上程的情況下可以減少開啟程序的開銷以及節省記憶體空間的使用。 多核: 1. I/O密集型,再多的核也解決不了I/O等待的問題,應該選擇方案二 2. 計算密集型,多核意味著平行計算,應該選擇方案一
計算密集型效能測試 from multiprocessing import Process from threading import Thread import time
def task1(): x = 1 for i in range(60000000): x += i def task2(): x = 1 for i in range(60000000): x += i if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() - start) # 結論 # 對於計算密集型的程式我們應該使用多程序來代替多執行緒。因為python中的多執行緒並不能利用多核實現真正的並行 # 使用多程序的結果 # 8.21526575088501 # 使用多執行緒的結果 # 12.482805252075195
I/O密集型效能測試 from threading import Thread import time
def task1(): time.sleep(3) def task2(): time.sleep(3) if __name__ == '__main__': p1 = Process(target=task1) p2 = Process(target=task2) # p1 = Thread(target=task1) # p2 = Thread(target=task2) start = time.time() p1.start() p2.start() p1.join() p2.join() print(time.time() - start) # 結論 # 對於I/O密集型的程式我們應該用多執行緒去代替多程序 # 使用多程序的結果 # 3.2341346740722656 # 使用多執行緒的結果 # 3.002285242080688
三. 程序池與執行緒池
程序池和執行緒池
池就是一個容器,程序池就是用來裝程序的容器,那麼執行緒池就是用來裝執行緒池的容器,為什麼我們需要程序池和執行緒池呢?因為在大部分的情況下由於計算機硬體條件的限制我們並不能無線的開啟程序或者執行緒,儘管執行緒的開銷很小,因此我們需要用一個容器來限制能夠開啟的最大程序數和執行緒數,從而保證我們的計算機可以正常的提供服務。
程序池的簡單使用
from concurrent.futures import ProcessPoolExecutor import os import random import time
def task1(x): print('%s is running..' % os.getpid()) time.sleep(random.randint(1,3))
if name == 'main': # 建立一個程序池,程序池的大小可以通過引數進行傳遞, 如果不指定,預設是cpu的核數 process_pool = ProcessPoolExecutor(4) # 當執行完submit之後,就會額外的創建出4個程序,用來執行任務 # 函式task1的引數直接在submit中輸入就可以傳參 process_pool.submit(task1, 1) time.sleep(30)
在cmd中執行輸入命令 【tasklist |findstr pyt】 會發現出現了5個python程序,其中有一個是主執行緒,另外四個就是程序池內的程序數。
- 執行緒池的簡單使用
執行緒池的使用和程序池是一樣的,只是匯入的名稱不一樣。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread
import os
import random
import time
def task1(x):
print('%s is running..' % x)
time.sleep(random.randint(1,3))
if __name__ == '__main__':
# 建立一個執行緒池,執行緒池的大小可以通過引數進行傳遞, 如果不指定,預設是cpu的核數 * 5
process_pool = ThreadPoolExecutor(4)
for i in range(20):
# 當第一次執行submit之後,就會額外的創建出4個執行緒等待著執行任務
# 因此當我們執行了程式碼之後就會看到一下打印出了四行內容,之後就是執行完一個任務再進行下一個任務,最多的執行緒數是5個
# 其中有一個執行緒是主執行緒
process_pool.submit(task1, i)
# time.sleep(2)
四.同步vs非同步 阻塞vs非阻塞
- 阻塞和非阻塞 阻塞和非阻塞描述的是程式的一種執行狀態 阻塞(阻塞態) 遇到I/0之後,程式在原地等待I/0,並釋放cpu資源 非阻塞(就緒態或者執行態): 沒有遇到I/0,或者通過某種方式即便是程式遇到I/0也不會停在原地,而是去執行其他的操作,儘可能多的使用cpu的資源。
同步呼叫和非同步呼叫 同步和非同步描述的是程式執行的方式 同步呼叫 提交完任務之後就在原地進行等待,直到任務執行完畢並拿到了返回值,才會去執行下一行程式碼。 非同步呼叫 提交完任務之後不會在原地進行等待,直接執行下一行程式碼,結果可以通過非同步回撥得到。
同步呼叫簡單的實現過程 from concurrent.futures import ThreadPoolExecutor import time import random
def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此處可以返回一個物件,物件有一個result屬性可以讓我們獲得返回值 obj = thread_pool.submit(task, i) # 因為有了result,所以此時的程式就是一個同步呼叫的方式,我們必須等待任務一個一個完成並且拿到返回值之後才會繼續執行迴圈 print(obj.result())
非同步呼叫簡單的實現過程 from concurrent.futures import ThreadPoolExecutor import time import random
def task(x): print('%s is running' % x) time.sleep(random.randint(1, 3)) return x ** 2 thread_pool = ThreadPoolExecutor(4) obj_l = [] for i in range(20): # 此處可以返回一個物件,物件有一個result屬性可以讓我們獲得返回值 # 此時是非同步呼叫方式,因為在所有的任務建立的時候,我們並不需要去等待結果,就可以直接去執行下一次迴圈 obj = thread_pool.submit(task, i) obj_l.append(obj) # 就相當於之前的的join等待執行緒結束 # 因為現在是一個執行緒池,所以我們不能單純的使用join去等待執行緒結束,我們還需要close將目前的任務先鎖定住 # 其實shutdown內部就是實現了先鎖定任務,然後等待所有任務執行完畢的過程 thread_pool.shutdown() # 當所有的任務結束之後,我們就可以通過返回的物件列表中隨便檢視其中的值 print(obj_l[0].result())
五. 非同步+回撥機制
案例:寫一個簡單的爬蟲案例,來詳細的分析一下非同步和回撥機制
首先,我們先以多程序的方式寫一個同步的程式,用來爬取網頁上的資訊
import requests
import time
import os
import random
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""獲取網頁的資訊"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模擬下載時間
if response.status_code == 200:
return response.text
def parse(data):
"""解析資料"""
time.sleep(0.2) # 模擬解析時間
print('%s 解析長度為%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 建立一個程序池,設定程序池的數量為4
pool = ProcessPoolExecutor(4)
# 這是我們需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 每執行一個任務,都通過result去獲得相應的資料,然後通過parse去解析資料
data = pool.submit(get, url).result()
parse(data)
問題:我們發現雖然說能夠實現基本的功能,但是太慢了,每次都要等待一個任務全部完成獲得返回值之後才會去執行下面的程式碼,為了提升效率,我們考慮可以把同步的方式轉換成非同步。因此我們需要將name裡面的內容轉換成下面的樣子。
if __name__ == '__main__':
# 建立一個程序池,設定程序池的數量為4
pool = ProcessPoolExecutor(4)
# 這是我們需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
obj_l = []
for url in urls:
obj = pool.submit(get, url)
obj_l.append(obj)
# 等待程序池內的任務全部完成之後才去執行下面的程式碼
pool.shutdown()
# 此時url的內容都已經下載完成並且儲存在物件obj_l列表中,我們通過parse就可以解析了
for obj in obj_l:
parse(obj.result())
問題: 雖然說效率有所提升但是依然存在一些問題
- 解析過程要等待所有的下載任務執行完成之後才能進行解析
- 解析資料是序列的,如果一個解析過程很慢,就會大大的降低整個程式的效率
基於上面的問題,我們可以將解析過程放在get函式裡面,在一個任務下載結束之後就會立馬的進行解析資料,可以解決問題1,而且對於解析資料而言都是通過下載資料相同的程序進行解析的,可以解決第二個問題。因此我們的程式碼可以修改成下面這個樣子。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""獲取網頁的資訊"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模擬下載時間
if response.status_code == 200:
# 返回值我們也不需要,直接去呼叫parse解析就可以了
parse(response.text)
def parse(data):
"""解析資料"""
time.sleep(0.2)
print('%s 解析長度為%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 建立一個程序池,設定程序池的數量為4
pool = ProcessPoolExecutor(4)
# 這是我們需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
pool.submit(get, url)
問題: 這樣寫的話我們會發現下載內容和解析內容的程式碼被寫在一塊了,而且還是被同一個程序執行,這就和我們之前所講的生產者和消費者模型相悖了,如果我們想讓兩個函式解耦合,我們可以在get函式中將結果返回回來,然後在主程序中接收,並執行解析函式。此時我們就需要用到回撥函數了。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""獲取網頁的資訊"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模擬下載時間
if response.status_code == 200:
# 返回值我們也不需要,直接去呼叫parse解析就可以了
return response.text
def parse(data):
"""解析資料"""
time.sleep(0.2)
print('%s 解析長度為%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 建立一個程序池,設定程序池的數量為4
pool = ProcessPoolExecutor(4)
# 這是我們需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 通過submit將物件obj扔到程序池中
obj = pool.submit(get, url)
# 通過函式add_done_callback函式將傳遞的函式parse繫結到物件上obj
# 當物件obj所執行的任務一旦完成並獲得一個返回值的時候就會自動的去呼叫parse函式,並將物件當做引數傳遞進去
# 這種方式和get方法中直接解析資料所實現的效果都是差不多的,只是說解決了耦合的問題,從某種意義上講,有時候解析的時間還要更長一點
# 注意: parse函式是全部都是通過主程序進行呼叫的,這也就解釋了回撥的意思,我來呼叫你,結果給我之後就應該就由我來解析
obj.add_done_callback(parse)
多執行緒和多程序的方式都是一樣的,唯一不同的地方就在於回撥函式是哪個執行緒空閒哪個執行緒去執行回撥函式
六. 執行緒queue, Event
- Queue和程序的佇列一樣,先進先出 # 這種佇列是先進先出 q1 = queue.Queue(3) q1.put(1) q1.put('2') q1.put([234]) # q1.put({5: 6}) # 此時會阻塞,等到取出去一個之後才能新增到佇列中 print(q1.get()) # 結果1
- LifoQueue 堆疊,先進後出 # 先進後出,堆疊 q1 = queue.LifoQueue(3) q1.put(1) q1.put('2') q1.put([234]) # q1.put({5: 6}) # 此時會阻塞,等到取出去一個之後才能新增到佇列中 print(q1.get()) # 結果是[234]
- PriorityQueue 優先順序佇列,數字越小優先順序越大 # 優先順序佇列,傳入的引數是一個元組,第一個值為優先順序,必須是int,第二個是要壓入佇列中的值 # 優先順序越小越優先,如果優先順序相等,比較後面的值,越小越優先 q1 = queue.PriorityQueue(3) q1.put((1, '123')) q1.put((1, '456')) q1.put((2, '789')) # q1.put((2, {5: 6})) # 此時會阻塞,等到取出去一個之後才能新增到佇列中 print(q1.get()) # 結果(1, '123')
Event 用來程序之間協同工作的 簡單的event的使用,用來提前連線伺服器判斷伺服器是否可連 from threading import Thread, Event, current_thread import time
# 建立一個event物件 event = Event() def check(): """首先啟動一個執行緒嘗試連線請求""" print('%s 嘗試伺服器是否可以連線' % current_thread().name) time.sleep(3) # 模擬連線請求持續了秒 event.set() # 當連線請求成功之後設定事件 def connection(): """當check檢測通過之後再開始連線""" print('%s 嘗試連線' % current_thread().name) event.wait() # 當event.set之後才會執行下面的程式碼,否則阻塞 print('%s 連線成功' % current_thread().name) t1 = Thread(target=connection) t2 = Thread(target=connection) t3 = Thread(target=connection) t4 = Thread(target=check) t1.start() t2.start() t3.start() t4.start()
可以使用is_set功能模擬嘗試三次連線之後斷開連線的操作,將connection函式改成下面這個樣子 def connection(): """當check檢測通過之後再開始連線""" # 如果沒有設定,也就是說check還沒有來連線上伺服器,就一直嘗試連線 count = 1 while not event.is_set(): if count > 3: print('您嘗試連線的次數過多,請稍後重試') return time.sleep(0.8) print('%s 嘗試連線' % current_thread().name) count += 1 # event.wait() # 當event.set之後才會執行下面的程式碼,否則阻塞 print('%s 連線成功' % current_thread().name)