1. 程式人生 > 其它 >2022.4.21死鎖現象、訊號量、程序池、執行緒池、協程

2022.4.21死鎖現象、訊號量、程序池、執行緒池、協程

2022.4.21死鎖現象、訊號量、程序池、執行緒池、協程

  • GIL與普通互斥鎖的區別
  • 驗證多執行緒作用
  • 死鎖現象
  • 訊號量
  • event事件
  • 程序池與執行緒池
  • 協程

一、GIL與普通互斥鎖的區別

1、先程式碼驗證GIL的存在

from threading import Thread
import time
money = 100
def task():
    global money
    money -= 1
for i in range(100):  # 建立100個執行緒
    t = Thread(target=task)
    t.start()
print(money)  # 0

分析:結果為0,說明各個執行緒搶到全域性鎖才回去執行,執行完再交接給下一位,最後都進行了資料修改

2、程式碼驗證不同資料加不同鎖

from threading import Thread, Lock
import time
money = 100
mutex = Lock()  # 互斥鎖
def task():
    global money
    mutex.acquire()  # 搶鎖
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()  # 放鎖
    
t_list = []  # 存放執行緒的列表
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()  # 給所有執行緒加join方法,確保所有的執行緒執行完畢
print(money)  # 0
注意:如果這裡沒加互斥鎖mutex的時候,結果為99,為什麼?
分析:因為如果沒有互斥鎖保證它獨立執行完再執行下一個的話,每個執行緒獲取到的money都是100,tmp-1都是99,那麼最終結果也就是99

"""
GIL是一個純理論知識 在實際工作中根本無需考慮它的存在

GIL作用面很窄 僅限於直譯器級別 
	後期我們要想保證資料的安全應該自定義互斥鎖(使用別人封裝好的工具)
"""

3、搶鎖放鎖簡便寫法

mutex = lock()
with mutex:
    加鎖的程式碼

二、驗證多執行緒的作用

1、計算密集型多程序和多執行緒對比

from threading import Thread
from multiprocessing import Process
import os
import time

def work():
    res = 1
    for i in range(1, 100000):
        res *= i
if __name__ == '__main__':
    print(os.cpu_count())  # 8 檢視當前計算機CPU個數
    start_time = time.time()
    # 多程序
    p_list = []
    for i in range(8):
        p = Process(target=work)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
#   # 多執行緒
    # t_list = []
    # for i in range(8):
    #     t = Thread(target=work)
    #     t.start()
    #     t_list.append(t)
    # for t in t_list:
    #     t.join()
#   print('總耗時:%s' % (time.time() - start_time))  # 總耗時:18.78506636619568
    print('總耗時:%s' % (time.time() - start_time))  # 總耗時:4.876669883728027

對比程序和執行緒在我計算機上的表現,可以得出結論:

在多核計算機下,計算密集型任務使用多程序要比多執行緒更有優勢

2、IO密集型多程序和多執行緒對比

直接複製上面程式碼,把work改成IO操作,程序執行緒各起100個:

def work():
    time.sleep(2)   # 模擬純IO操作
...
# 多程序
總耗時:3.705044746398926
# 多執行緒
總耗時:2.0191071033477783

結論:IO密集型啟用多執行緒比多程序效率高

三、死鎖現象

ps:鎖就算掌握瞭如何搶 如何放 也會產生死鎖現象

程式碼演示:

from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()
    def f1(self):
        mutexA.acquire()  # 搶A鎖
        print(f'{self.name}搶到了A鎖')
        mutexB.acquire()  # 搶B鎖
        print(f'{self.name}搶到了B鎖')
        mutexB.release()  # 放B鎖
        mutexA.release()  # 放A鎖
    def f2(self):
        mutexB.acquire()  # 搶B鎖
        print(f'{self.name}搶到了B鎖')
        time.sleep(2)
        mutexA.acquire()  # 搶A鎖
        print(f'{self.name}搶到了A鎖')
        mutexA.release()  # 放A鎖
        mutexB.release()  # 放B鎖
for i in range(10):  # 建立10個執行緒
    t = MyThread()
    t.start()
# 結果:
Thread-1搶到了A鎖
Thread-1搶到了B鎖
Thread-1搶到了B鎖
Thread-2搶到了A鎖
# 然後就產生阻塞現象了,因為最後執行緒2搶到A鎖然後取搶B鎖時,B鎖還線上程1手裡,然而執行緒1下面也要搶A鎖,兩者都進入阻塞
結論:
鎖不能輕易使用並且以後我們也不會在自己去處理鎖都是用別人封裝的工具

四、訊號量

訊號量在不同知識體系中,展現出來的功能時不一樣的,

eg:

在併發程式設計中:訊號量是把互斥鎖

在djando框架中:訊號量時達到某條件自動觸發特定功能

程式碼演示:

"""
如果將自定義互斥鎖比喻成是單個廁所(一個坑位)
那麼訊號量相當於是公共廁所(多個坑位)
"""
from threading import Thread, Semaphore
import time
import random

sp = Semaphore(5)  # 建立一個有五個坑位(帶門的)的公共廁所

def task(name):
    sp.acquire()  # 搶鎖
    print('%s正在蹲坑' % name)
    time.sleep(random.randint(1, 5))
    sp.release()  # 放鎖
for i in range(1, 31):
    t = Thread(target=task, args=('傘兵%s號' % i, ))
    t.start()

分析:上面程式碼建立帶有五個位置的訊號量,然後建立30個執行緒,會發現,程式碼結果會直接先執行5個執行緒,然後陸續執行接下來的執行緒,說明,訊號量是設定好的5個位置,前面的位置騰出來後後面的執行緒再進去,以此類推

五、event事件(執行緒間通訊)

作用:子執行緒的執行可以由其他子執行緒決定或者干涉

Event是執行緒間通訊的機制之一:一個執行緒傳送一個event訊號,其他的執行緒則等待這個訊號。常用在一個執行緒需要根據另外一個執行緒的狀態來確定自己的下一步操作的情況。

Event原理:事件物件管理一個內部標誌,通過set()方法將其設定為True,並使用clear()方法將其設定為False。wait()方法阻塞,直到標誌為True。該標誌初始為False。

程式碼驗證:

from threading import Thread, Event
import time

event = Event()  # 類似於造了一個紅綠燈

def light():
    print('紅燈亮,原地等待')
    time.sleep(3)
    print('綠燈行,衝過馬路')
    event.set()  # True
def car(name):
    print('%s正在等紅燈' % name)
    event.wait()  # 阻塞,等待為True時退出阻塞
    print('%s加油門,開始飆車' % name)
    
t = Thread(target=light)  # 建立1個執行緒light,模擬紅綠燈
t.start()
for i in range(10):  # 建立10個執行緒car,模擬汽車司機
    t = Thread(target=car, args=('汽車司機%s' % i,))
    t.start()
# 結果:
紅燈亮,原地等待
汽車司機0正在等紅燈
...                 # 省略
汽車司機9正在等紅燈
綠燈行,衝過馬路
汽車司機1加油門,開始飆車
...                 # 省略
汽車司機7加油門,開始飆車

# 拓展:
event.clear()  # 將事件設定為False
is_set()  # 當且僅當內部標誌為True時返回True

可以看出,wait()造成阻塞,等待set()執行,然後結束阻塞

六、程序池與執行緒池(重點)

引入:

​ 服務端必備的三要素
​ 1.24小時不間斷提供服務
​ 2.固定的ip和port
​ 3.支援高併發

回顧:

​ TCP服務端實現併發
​ 多程序:來一個客戶端就開一個程序(臨時工)
​ 多執行緒:來一個客戶端就開一個執行緒(臨時工)

問題:

​ 計算機硬體是有物理極限的,我們不可能無限制的建立程序和執行緒

措施:

池:

​ 保證計算機硬體安全的情況下提升程式的執行效率

程序池:

​ 提前建立好固定數量的程序,後續反覆使用這些程序(合同工)

執行緒池:

​ 提前建立好固定數量的執行緒,後續反覆使用這些執行緒(合同工)

強調:

​ 如果任務超出了池子裡面的最大程序或執行緒數,則原地等待

​ 程序池和執行緒池其實降低了程式的執行效率,但是保證了硬體的安全!!!

程式碼演示(掌握):

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecytor

# 執行緒池
pool = ThreadPoolExecutor(5)  # 執行緒池執行緒數預設時CPU個數的5倍,也可以自定義,這個程式碼執行之後就會立刻建立5個等待工作的執行緒

def task(n):
    time.sleep(2)
    print(n)
    return '任務的執行結果:%s'%n**2

def func(*args, **kwargs):
    # print(args, kwargs)
    print(args[0].result())

for i in range(20):
    # res = pool.submit(task, i)  # 朝執行緒池中提交任務(非同步)及引數
    # print(res.result())  # 同步提交(獲取任務的返回值)
    '''這樣效率太慢,不應該自己主動等待結果 應該讓非同步提交自動提醒>>>:非同步回撥機制'''
# 非同步回撥機制:非同步提交自動提醒add_done_callback(func)    
pool.submint(task, i).add_done_callback(func)
"""add_done_callback只要任務task有結果了,就會自動呼叫括號內的函式func處理,把task任務當作引數傳入func"""

# 程序池
pool = ProcessPoolExecutor(5)  # 程序池程序數預設是CPU個數 也可以自定義
'''上面的程式碼執行之後就會立刻建立五個等待工作的程序'''
pool.submit(task, i).add_done_callback(func)

七、協程

程序:資源單位
執行緒:執行單位
協程:單執行緒下實現併發

程式碼演示:

from gevent import monkey;monkey.patch_all()  # 固定編寫 用於檢測所有的IO操作
from gevent import spawn
import time

def play(name):
    print('%s play 1' % name)
    time.sleep(5)
    print('%s play 2' % name)

def eat(name):
    print('%s eat 1' % name)
    time.sleep(3)
    print('%s eat 2' % name)
start_time = time.time()
g1 = spawn(play, 'jason')  # 協程g1
g2 = spawn(eat, 'jason')  # 協程g2
g1.join()  # 等待檢測任務執行完畢
g2.join()  # 等待檢測任務執行完畢
print('總耗時:', time.time() - start_time)  # 5.00609827041626

基於協程實現TCP服務端併發

from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket

def communication(sock):
    while True:
        data = sock.recv(1024)  # IO操作
        print(data.decode('utf8'))
        sock.send(data.upper())
        
def get_server():
    server = socket.socket()
    server.bind(('127.0.0.1', 8080))
    server.listen(5)
    while True:
        sock, addr = server.accept()  # IO操作
        spawn(communication, sock)

g1 = spawn(get_server)
g1.join()