1. 程式人生 > 實用技巧 >執行緒與執行緒鎖---python版本(附帶執行緒鎖例項)

執行緒與執行緒鎖---python版本(附帶執行緒鎖例項)

程序和執行緒

現代的作業系統基本上都是支援多工的作業系統
那?什麼叫多工呢,簡單而言,就是作業系統可以同時執行多個任務。

就像是上圖這種,可以同時執行多個任務,當然還有很多的後臺應用也在默默的執行著,只是沒有顯示在桌面而已

這裡就要談到單核和多核CPU

單核CPU在執行任務時,通常都是任務1執行一個單位時間,任務2執行一個單位時間,,再切換到任務3,任務4.... 這樣反覆切換。表面而言,這些任務切換時都是交替執行的,但是,由於CPU執行速度實在太快,肉眼感知就像所有的任務都是在同時執行一樣。

多核CPU真正的並行執行多工只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,作業系統也會自動把很多工輪流排程到每個核心上執行。

對於作業系統來說,一個任務就是一個程序(Process)。每個程序至少要幹一件事,所以,一個程序至少有一個執行緒,複雜的程序可以有多個執行緒,多個執行緒可以同時執行,多執行緒的執行方式和多程序是一樣的,也是由作業系統在多個執行緒之間快速切換,讓每個執行緒都短暫地交替執行,看起來就像同時執行一樣。當然,真正地同時執行多執行緒需要多核CPU才可能實現。

如果我們要同時執行多個任務怎麼辦?

有兩種解決方案:

第一種是啟動多個程序,每個程序雖然只有一個執行緒,但多個程序可以一塊執行多個任務。

第二種方法是啟動一個程序,在一個程序內啟動多個執行緒,這樣,多個執行緒也可以一塊執行多個任務。

當然還有第三種方法,就是啟動多個程序,每個程序再啟動多個執行緒,這樣同時執行的任務就更多了,當然這種模型更復雜,實際很少採用。

總結一下就是,多工的實現有3種方式:

  • 多程序模式;
  • 多執行緒模式;
  • 多程序+多執行緒模式。`

小結

  • 執行緒是最小的執行單元,而程序由至少一個執行緒組成。如何排程程序和執行緒,完全由作業系統決定,程式自己不能決定什麼時候執行,執行多長時間。

  • 多程序和多執行緒的程式涉及到同步、資料共享的問題,編寫起來更復雜。

程序

先說兩句:什麼是程序?有什麼用?——ok,那我問你,你能一手畫圓一手畫方嗎?——我猜不能。但計算機就不一樣了,一邊繪製正方體一邊繪製球體都是小case(螢幕上自動繪製圖形),這是因為計算機啟動了另一個"大腦"來處理另一個任務,即兩個“大腦”分別同時畫兩個圖形 效率X2!我們之前的寫程式都是計算機一個“大腦”在工作!ok,那怎麼啟動計算機其他的大腦呢?——啟動另一個程序就可以了!

Unix/Linux作業系統中,提供了一個fork()系統呼叫,它非常特殊。普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後,分別在父程序和子程序內返回。

子程序永遠返回0,而父程序返回子程序的ID
由是,一個父程序可以fork出很多子程序,所以,父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID
python的os模組中就封裝了常見的系統呼叫,其中就包括fork

Windows中沒有fork呼叫,所以無法採用fork形式,因為python是跨平臺的,所以,就有了multiprocessing模組,此模組就是支援跨平臺版本的多程序模組

multiprocessing模組提供了一個Process類來代表一個程序物件,下面的例子演示了啟動一個子程序並等待其結束

from multiprocessing import Process
import os

# 子程序要執行的程式碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

執行結果如下:

Parent process 928.
Child process will start.
Run child process test (929)...
Process end.

建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動,這樣建立程序比fork()還要簡單。

join()方法可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。

簡化如上操作見下
1、建立程序:

import multiprocessing
import time

def action(a, b):  # 待會兩個程序要執行的任務↓
    for i in range(30):  # 迴圈30次
        print(a, ' ', b)
        time.sleep(0.1)  # 等待0.1s

if __name__ == '__main__':  # 這行程式碼很重要,新建程序的時候都加上它!!原因不用管(我也不知道233)

    jc1 = multiprocessing.Process(target=action, args=('程序一', 0))  # 準備建立一個程序:multiprocessing.Process()
    jc2 = multiprocessing.Process(target=action, args=('程序二', 1))  # 再準備建立一個新程序,這是基本格式記住←
# 必要引數target:指定程序要執行的任務(這裡是執行函式 action),必要引數args:直譯成中文就是'引數',顧名思義就是前面target的引數,即action的引數,注意args是個元組,所以args後的引數寫成tuple元組格式。直接寫target('程序一',0)一定報錯的

    jc1.start()  # 將蓄勢待發的jc1程序正式啟動!!
    jc2.start()  # 同上...

    jc1.join()  # 等待程序jc1將任務執行完...
    jc2.join()  # ...
    print('jc1,jc2任務都已執行完畢')

    jc1.close()  # 徹底關閉程序jc1
    jc2.close()  # ...
     #輸出結果是兩個程序同時且連續列印0、1

2、Pool:程序池,可以啟動大量的子程序,或者批量建立子程序

import time
import os

def action1(a, b=50):
    for i in range(b):
        print(a, os.getpid(), ' ', i)  # os.getpid(): pid簡單來說就是每個程序的“身份證”
        time.sleep(0.1)

if __name__ == '__main__':  # 還要新增這行,否則可能出現異常

    ci = Pool(3)  # 建立一個程序池,容量為3個程序
    ci.apply_async(action1, args=('程序一',))  # 啟動第一個子程序...
    ci.apply_async(action1, args=('程序二', 50))  # 和普通程序的啟動方式有很大不同仔細看
    ci.apply_async(action1, args=('程序三', 60))  # Pool的最基本格式記住←
# 注意:程式現在有4個程序在執行:上面的三個子程序 和一個最為核心的:主程序

    ci.close()  # 關閉程序池(但池子內已啟動的子程序還會繼續進行)
    ci.join()  # 等待程序池內的所有子程序完畢
    print('比如說這最後的一行輸出就是主程序執行任務打印出來的')
    #主程序(父程序)全程幹了什麼?建立程序池、啟動子程序、關閉程序池、等待子程序完畢、列印最後一行

程式碼解讀:
對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了

3、程序間的通訊:

Process之間肯定是需要通訊的,作業系統提供了很多機制來實現程序間的通訊。Python的multiprocessing模組包裝了底層的機制,提供了Queue、Pipes等多種方式來交換資料

import multiprocessing

def foo(aa):
    ss = aa.get()  # 管子的另一端放在子程序這裡,子程序接收到了資料
    print('子程序已收到資料...')
    print(ss)  # 子程序打印出了資料內容...

if __name__ == '__main__':  # 要加這行...

    tx = multiprocessing.Queue()  # 建立程序通訊的Queue,你可以理解為我拿了個管子來...
    jc = multiprocessing.Process(target=foo, args=(tx,))  # 建立子程序
    jc.start()  # 啟子子程序

    print('主程序準備傳送資料...')
    tx.put('有內鬼,終止交易!')  # 將管子的一端放在主程序這裡,主程序往管子裡丟入資料↑
    jc.join()
#這種方法可以實現任意程序間的通訊,這裡寫的是主、子程序間的通訊#

Windows上的程序見下圖

小結

  • 在Unix/Linux下,可以使用fork()呼叫實現多程序。

  • 要實現跨平臺的多程序,可以使用multiprocessing模組。

  • 程序間通訊是通過Queue、Pipes等實現的。

執行緒

多工可以由多程序完成,也可以由一個程序內的多執行緒完成

由於執行緒是作業系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援

python中,提供了兩個模組:

_threadthreading_thread是低階模組,threading是高階模組,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。
啟動一個執行緒就是把一個函式傳入並建立Thread例項,然後呼叫start()開始執行

import time, threading

# 新執行緒執行的程式碼:
def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

threading模組有個current_thread()函式,它永遠返回當前執行緒的例項
主執行緒例項的名字叫MainThread,子執行緒名字如果未命名,則會顯示成Thread-1Thread-2……

執行緒鎖

多執行緒和多程序最大的不同在於

  • 多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響
  • 多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改

因此,執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了

例如銀行存款

import time, threading

# 假定這是你的銀行存款:
balance = 0

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

然而實際結果卻是

8

這並沒有達到預期結果0
原因是因為高階語言的一條語句在CPU執行時是若干條語句,即使一個簡單的計算:

balance = balance + n

也分兩步:

  • 計算balance + n,存入臨時變數中;
  • 將臨時變數的值賦給balance。
    也就是可以看成:
x = balance + n
balance = x

究其原因,是因為修改balance需要多條語句,而執行這幾條語句時,執行緒可能中斷,從而導致多個執行緒把同一個物件的內容改亂了。
所以,我們必須確保一個執行緒在修改balance的時候,別的執行緒一定不能改。

那麼該如何確保balance計算正確呢?就要給change_it()上一把鎖,當某個執行緒開始執行change_it()時,我們說,該執行緒因為獲得了鎖,因此其他執行緒不能同時執行change_it(),只能等待,直到鎖被釋放後,獲得該鎖以後才能改。
由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以,不會造成修改的衝突。
建立一個鎖就是通過threading.Lock()來實現:

balance = 0
lock = threading.Lock()

def run_thread(n):   # 上鎖的方式一
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()
# !/usr/bin/python3
# -*-coding:UTF-8-*-
# FileName: 完整例子

import threading

balance = 0
def change_it(n):
    '''先存後取,結果應該為0'''
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):  # 上鎖的方式二
    '''with Lock的作用相當於自動獲取和釋放鎖(資源)'''
    with lock:
        for i in range(100000):
            change_it(n)      # 放心地改吧
# def run_thread(n):  # 上鎖的方式三
#    for i in range(100000):
#        with lock:
#           change_it(n)
def main():
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f'total balance is : {balance}')

if __name__ == '__main__':
    main()

當多個執行緒同時執行lock.acquire()時,只有一個執行緒能成功地獲取鎖,然後繼續執行程式碼,其他執行緒就繼續等待直到獲得鎖為止。

獲得鎖的執行緒用完後一定要釋放鎖,否則那些苦苦等待鎖的執行緒將永遠等待下去,成為死執行緒。所以我們用try...finally來確保鎖一定會被釋放。

鎖的好處就是確保了某段關鍵程式碼只能由一個執行緒從頭到尾完整地執行

壞處當然也很多,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率就大大地下降了。
其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止

小結

  • 多執行緒程式設計,模型複雜,容易發生衝突,必須用鎖加以隔離,同時,又要小心死鎖的發生。
  • Python直譯器由於設計時有GIL全域性鎖,導致了多執行緒無法利用多核。多執行緒的併發在Python中就是一個美麗的夢。

例項

import time, threading

# 建立三個執行緒,利用執行緒鎖,操作同一個列表中的資料
# 執行緒一:給列表增加一個元素
# 執行緒二:給列表刪除一個元素
# 執行緒三:修改列表元素

L1 = ["python","javascript","PHP","Java"]
glock = threading.Lock()

def add_list():
    glock.acquire()
    time.sleep(2)
    num = input("please input a data to add:\n")
    L1.append(num)
    glock.release()
    print("子執行緒一:", L1)
    

def pop_list():
    glock.acquire()
    time.sleep(2)
    n = int(input("\n please input the number of list to pop:"))
    L1.pop(n)
    glock.release()
    print("子執行緒二:", L1)

def change_list():
    glock.acquire()
    time.sleep(2)
    print(L1)
    num = int(input("\n please input the number of list that you need change:\n"))
    strs = input("\n please input your data to change:\n")
    L1[num] = strs
    glock.release()
    print("子執行緒三:", L1)


if __name__ == '__main__':
    # for i in range(4):
    # 程序一:增加一個元素
    t1 = threading.Thread(target=add_list, name="執行緒一")    
    # 程序二:刪除一個元素
    t2 = threading.Thread(target=pop_list, name="執行緒二") 
    # 程序三:修改一個元素
    t3 = threading.Thread(target=change_list, name="執行緒三")
    print("執行緒 %s 正在執行中" % threading.current_thread().name)
    # 開啟執行緒
    t1.start()
    t2.start()
    t3.start()
    print('主執行緒執行結束')

如果有幫助您,那我非常榮幸;如果有不正,非常希望您能夠指出,以便改正