2022.4.20程序補充及執行緒相關理論、方法概念
2022.4.20程序補充及執行緒相關理論、方法概念
- 訊息佇列
- IPC機制(程序間通訊)
- 生產者消費者模型
- 執行緒理論(重要)
- 開設執行緒的兩種方式
- 執行緒實現TCP服務端併發
- 執行緒join方法
- 執行緒間資料共享
- 執行緒物件屬性及方法
- 守護執行緒
- GIL全域性直譯器鎖
一、訊息佇列
ps:由於目前的知識儲備還不夠直接學習訊息佇列 所以先學習內建佇列,以後我們會直接使用別人封裝好的訊息佇列 實現各種資料傳輸
from multiprocessing import Queue q = Queue(5) # 定義一個佇列並設定長度為5 1.往佇列放資料 q.put(111) # 朝佇列中存放資料 q.put(222) q.put(333) print(q.full()) # False 判斷佇列是否滿了 q.put(444) q.put(555) print(q.full()) # True 佇列已滿 此時再往佇列放資料就會超出最大長度,原地阻塞等待隊列出現空位 2.從佇列取資料 print(q.get()) # 從佇列拿資料 print(q.get()) print(q.empty()) # False 判斷佇列是否空了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) # True 佇列已空 此時佇列沒有資料,繼續獲取進入阻塞狀態,等待佇列中給值 print(q.get_nowait()) # 佇列裡沒有值直接報錯 """ full() empty() get_nowait() 上述方法能否在併發的場景下精準使用??? 不能用!!! 之所以介紹佇列是因為它可以支援程序間資料通訊 """
二、IPC機制(程序間通訊)
1.主程序與子程序資料互動
2.兩個子程序資料互動
本質:不同記憶體空間中的程序資料互動
from multiprocessing import Process, Queue def producer(q): q.put('666') # 往佇列放資料 def consumer(q): print(q.get()) # 取佇列資料 if __name__ == '__main__': q = Queue() # 設定佇列 p1 = Process(target=producer, args=(q, )) p2 = Process(target=consumer, args=(q,)) p1.start() p2.start() # q.put(123) # 主程序往佇列中存放資料123 print('主程序') # 結果: 主程序 666
這樣不同程序都在操作和使用這個佇列,實現程序間通訊
三、生產者消費者模型
1、概念
生產者:負責生產/製作資料
消費者:負責消費/處理資料
"""
比如在爬蟲領域中:
會先通過程式碼爬取網頁資料(爬取網頁的程式碼就可以稱之為是生產者)
之後針對網頁資料做篩選處理(處理網頁的程式碼就可以稱之為消費者)
"""
如果使用程序來演示
除了有至少兩個程序之外 還需要一個媒介(訊息佇列)
以後遇到該模型需要考慮的問題其實就是供需平衡的問題
即生產力與消費力要均衡
2、程式碼演示
from multiprocessing import Process, Queue,JoinableQueue import time import random def producer(name, food, q): for i in range(5): data = f'{name}生產了{food}{i}' print(data) time.sleep(random.randint(1,3)) # 模擬產生過程 q.put(data) # 將資料放入佇列 def consumer(name,q): while True: food = q.get() time.sleep(random.random()) print(f'{name}吃了{food}') q.task_done() # JoinableQueue佇列,每次去完資料必須給佇列一個反饋 ''' 問題:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步,進入阻塞態。 解決方式: 普通方法:無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈 JoinableQueue 方法: 生產者生產的每個資料上都做一個標記,消費者每 q.get() 取一個值,都用 q.task_done() 標記一次,q.join()感知佇列中的資料全部處理完畢,再最終結束 ''' if __name__ == '__main__': # q = Queue(),這裡不使用普通佇列 q = JoinableQueue() # 使用JoinableQueue方法 p1 = Process(target=producer, args=('大廚jason', '韭菜炒蛋', q)) p2 = Process(target=producer, args=('老闆kevin', '祕製小漢堡', q)) c1 = Process(target=consumer, args=('濤濤', q)) c2 = Process(target=consumer, args=('龍龍', q)) c1.daemon = True c2.daemon = True p1.start() p2.start() c1.start() c2.start() # 生產者生產完所有資料之後 往佇列中新增結束的訊號 p1.join() p2.join() """佇列中其實已經自己加了鎖 所以多程序取值也不會衝突 並且取走了就沒了""" q.join() # 等待佇列中資料全部被取出(一定要讓生產者全部結束才能判斷正確) """執行完上述的join方法表示消費者也已經消費完資料了,相當於設定一道坎判斷佇列是否被取完,取完程式結束,守護程序直接結束,不會停留在阻塞態"""
四、執行緒理論
1、什麼是執行緒
程序:資源單位(在記憶體空間中申請一塊記憶體)
執行緒:執行單位(在程序的記憶體中執行任務,資源從程序空間取)
eg:程序相當於車間,執行緒相當於車間裡面的流水線
一個程序至少有一個執行緒!
2、為什麼要有執行緒?
答:開設執行緒的消耗遠遠小於程序
開程序:申請記憶體空間--->拷貝程式碼
開執行緒:無需申請記憶體、拷貝程式碼,程序內多個執行緒資料共享
總之,開多程序浪費記憶體空間及資源,執行緒解決了這個問題
五、開設執行緒的兩種方法
其實開程序和執行緒的方法幾乎是一樣的,只不過關鍵字不同
程式碼演示:
from threading import Thread
import time
def task(name):
print(f'{name}開始吃飯')
time.sleep(3)
print(f'{name}吃飽了')
# 建立執行緒無需在__main__下面編寫 但是為了統一 還是習慣在子程式碼中寫
t = Thread(target=task, args=('jason',))
t.start()
print('主執行緒')
# 結果(建立執行緒開銷很小,幾乎一瞬間就可以建立,因此執行很快):
jason開始吃飯
主執行緒
Jason吃飽了
# 用類做執行緒物件
class MyThread(Thread):
def __init__(self, username):
self.username = username
super().__init__()
def run(self):
print(f'{self.username}開始吃飯')
time.sleep(3)
print(f'{self.username}吃飽了')
t = MyThread('jason')
t.start()
print('主執行緒')
# 結果:
jason開始吃飯
主執行緒
Jason吃飽了
六、執行緒實現TCP服務端的併發
注意區分開設程序和執行緒的本質區別
import socket
from throading import Throad
# 這裡不用封裝函式,因為使用執行緒做互動
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen()
def talk(sock):
data = sock.recv(1024) # 接收資訊
print(data.decode('utf8'))
sock.send(data.upper()) # 傳送資訊
while True:
sock, addr = server.accept()
# 使用迴圈,每來一個客戶端就建立一個執行緒做資料互動
t = Throad(target=talk, args=(sock,))
t.start()
因此可以發現,使用執行緒實現併發更為好一點,因為效率高且節省資源
七、執行緒join方法
執行緒的join方法和程序相差無幾,都是讓子執行緒執行完畢之後再往下執行
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=task, args=('jason', ))
t.start()
t.join() # 主執行緒程式碼等待子執行緒程式碼執行完畢之後再往下執行
print('主執行緒')
"""
主執行緒為什麼要等著子執行緒結束才會結束整個程序
因為主執行緒結束也就標誌著整個程序的結束 要確保子執行緒執行過程中所需的各項資源
"""
八、同一程序內多個執行緒資料共享
from threading import Thread
money = 10000000000
def task():
global money
money = 1
t = Thread(target=task)
t.start()
t.join()
print(money)
思考:執行緒更改程序內資料,資料也會被更改
# 1
九、執行緒物件屬性和方法
1、同一程序下多個執行緒的程序號一致
2、如何統計程序下活躍的執行緒數
active_aount()
3、獲取執行緒的名字
1.current_throad().name
MainThread # 主執行緒
Thread-1、Thread-2 # 子執行緒
2.self.name # 類物件獲取執行緒名
十、守護執行緒
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t1 = Thread(target=task, args=('jason',))
t2 = Thread(target=task, args=('kevin',))
t1.daemon = True # 守護執行緒
t1.start()
t2.start()
print('主執行緒')
# 結果:
jason is running
kevin is running
主執行緒
kevin is over
jason is over
# 注意,在這裡,這個守護程序其實是起不了作用的,因為這裡有兩個子執行緒,其中一個是守護執行緒,但是主執行緒必須等待所有子執行緒執行結束才會真正結束,因為可能會有一些需要使用的資料,所以這裡守護執行緒是起不了作用的,除非兩個都是守護執行緒
十一、GIL全域性直譯器鎖
回顧:
python直譯器的類別有很多:
Cpython Jpython Ppython
垃圾回收機制:
應用計數、標記清除、分代回收
ps:GIl為純理論,不影響程式設計,只不過面試的時候可能會被問到
官方文件:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
解釋:
GIL只存在於CPython直譯器中,不是python的特徵
GIL是一把互斥鎖用於阻止同一個程序下的多個執行緒同時執行
原因是因為CPython直譯器中的垃圾回收機制不是執行緒安全的
強調:
GIL是加在CPython直譯器上面的互斥鎖,
同一個程序下的多個執行緒要想執行必須先搶GIL鎖,所以同一個程序下多個執行緒肯定不能同時執行,即:無法利用多核優勢
優劣勢:
優勢:保證資料安全,不會被垃圾回收機制回收
劣勢:無法發揮多核優勢,但是可以通過開設多程序彌補
實際應用:
多工為IO密集型:多執行緒更有優勢,消耗資源更少(多道技術:切換+儲存狀態)
多工為計算密集型:可以使用多程序,CPU越多越好