1. 程式人生 > 其它 >學會使用Python的threading模組、掌握併發程式設計基礎

學會使用Python的threading模組、掌握併發程式設計基礎

threading模組

Python中提供了threading模組來實現執行緒併發程式設計,官方文件如下:

官方文件

新增子執行緒

例項化Thread類

使用該方式新增子執行緒任務是比較常見的,也是推薦使用的。

簡單的程式碼示例如下,建立3個子執行緒並向其新增任務,然後執行並列印它們的執行緒ID和執行緒名字:

import threading
import time


def task(params):
    print("sub thread run")
    currentThread = threading.current_thread()
    time.sleep(3)
    print("current subthread id : %s\ncurrent subthread name : %s\ncurrent subthread params : %s" % (
        currentThread.ident, currentThread.name, params))


if __name__ == "__main__":
    print("main thread run")
    for item in range(3):
        subThreadIns = threading.Thread(target=task, args=(item, ))
        subThreadIns.start()
    print("main thread run end")

# main thread run
# sub thread run
# sub thread run
# sub thread run
# main thread run end
# current subthread id : 123145534398464
# current subthread name : Thread-1
# current subthread params : 0
# current subthread id : 123145544908800
# current subthread name : Thread-3
# current subthread params : 2
# current subthread id : 123145539653632
# current subthread name : Thread-2
# current subthread params : 1

❶:返回一個執行緒物件,注意args的引數必須是一個tuple,否則丟擲異常,也就是說單實參必須新增逗號

❷:start()方法是指該執行緒物件能夠被系統排程了,但不是立即執行該執行緒,而是等待系統排程後才執行。所以你會看見上面子執行緒的執行順序是0、2、1,另外一個執行緒物件只能執行一次該方法,若多次執行則丟擲RunTimeError的異常。

❸:獲取當前的執行緒物件

❹:獲取當前執行緒物件的編號和名字,以及傳入的引數。當執行緒啟動時,系統都會分配給它一個隨機的編號和名字

首先上述程式碼會先執行主執行緒,然後會建立3個子執行緒並執行。

當子執行緒執行的時候碰到了sleep(3)這種I/O操作時會釋放掉GIL鎖,並將執行緒執行權交還給了主執行緒。

然後主執行緒就執行完畢了,此時主執行緒並不會被kill掉,而是等待子執行緒執行結束後才會被kill掉,而子執行緒則是執行完畢後會被立刻kill掉。

我們可以看見,上面3個任務如果按照序列執行共會花費9.+秒時間,而通過多執行緒來執行,則僅需花費3.+秒的時間,極大的提升了任務處理效率。

自定義類覆寫run()方法

上面的子執行緒任務物件是一個全域性函式,我們也可以將它作為方法來進行呼叫。

書寫一個類並繼承Threading類,覆寫run()方法即可:

import threading
import time


class TaskClass(threading.Thread):  # ❶
    def __init__(self, params):
        self.params = params  # ❷
        super(__class__, self).__init__()

    def run(self):
        print("sub thread run")
        currentThread = threading.currentThread()
        time.sleep(3)
        print("current subthread id : %s\ncurrent subthread name : %s\ncurrent subthread params : %s" % (
            currentThread.ident, currentThread.name, self.params))


if __name__ == "__main__":
    print("main thread run")
    for item in range(3):
        subThreadIns = TaskClass(item)
        subThreadIns.start()
    print("main thread run end")

# main thread run
# sub thread run
# sub thread run
# sub thread run
# main thread run end
# current subthread id : 123145495068672
# current subthread name : Thread-1
# current subthread params : 0
# current subthread id : 123145500323840
# current subthread name : Thread-2
# current subthread params : 1
# current subthread id : 123145505579008
# current subthread name : Thread-3
# current subthread params : 2

❶:必須繼承Threading類並呼叫父類的__init__()方法

❷:傳入的引數

原始碼淺析

為什麼新增子執行緒有2種截然不同的方式呢?它們之間有什麼區別?這些都可以從原始碼中找到答案。

我們從Thread類的例項看起,首先是__init__()方法(threading.py line 738 - 800),它主要做了一些初始化的準備工作:

class Thread:

    _initialized = False

    _exc_info = _sys.exc_info


    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
    
        # 如果group不是None,就會丟擲斷言異常
        assert group is None, "group argument must be None for now"
        
        # 如果kwargs是None,則構造一個空字典
        if kwargs is None:
            kwargs = {}
            
        # 傳入的執行任務的函式或者None
        self._target = target
        
        # 執行緒名字
        self._name = str(name or _newname())
        
        # 任務函式傳入的元組引數
        self._args = args
        # 任務函式傳入的關鍵字引數
        self._kwargs = kwargs
        # 是否是守護執行緒啟動,如果不是None則以守護執行緒模式啟動
        if daemon is not None:
            self._daemonic = daemon
        # 如果是None,則繼承當前執行緒的守護模式
        else:
            self._daemonic = current_thread().daemon
        # 執行緒編號
        self._ident = None
        # 鎖定狀態,None
        self._tstate_lock = None
        # 一把Event事件鎖
        self._started = Event()
        # 是否停止執行的標誌位
        self._is_stopped = False
        # 初始化狀態改為True
        self._initialized = True
 
        self._stderr = _sys.stderr

        _dangling.add(self)

引數釋義:

  • group:應該為None,為了日後擴充套件ThreadGroup類而保留的
  • target:傳入一個可呼叫物件,即執行緒任務task,預設為None,即可以不進行傳入
  • name:執行緒啟動時將不再由系統分配執行緒名稱,而是自定義,預設情況下,系統分配的執行緒名稱會由 "Thread-N" 的格式構成一個唯一的名稱,其中 N 是小的十進位制數
  • args:用於呼叫目標函式的引數元組,預設是()空元組,你必須傳入一個元組
  • kwargs:用於呼叫目標函式的關鍵字引數字典,預設是None,你必須傳入一個字典
  • daemon:命名關鍵字引數,應當傳入一個布林值,預設為None,它會指定該執行緒是否是以守護執行緒模式啟動,如果為None,該執行緒將繼承當前執行緒的守護模式屬性

接下來看start()方法,它是告知系統當前執行緒完成排程,可隨時啟用的方法(threading.py line 828 - 851):

    def start(self):

        # 如果初始狀態不為True,則丟擲異常
        if not self._initialized:
            raise RuntimeError("thread.__init__() not called")

        # 判斷當前執行緒是否被鎖住,如果被鎖住則丟擲異常
        if self._started.is_set():
            raise RuntimeError("threads can only be started once")
        with _active_limbo_lock:
            _limbo[self] = self
        try:
            # 執行引導
            _start_new_thread(self._bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                del _limbo[self]
            raise
    
        self._started.wait()

這裡關鍵是看self._bootstrap()方法,該該方法位於(threading.py line 870 - 888),看看它會做什麼事情:

    def _bootstrap(self):
        try:
            self._bootstrap_inner()
        except:
            if self._daemonic and _sys is None:
                return
            raise

繼續找self._bootstrap_inner()方法,該該方法位於(threading.py line 901 - 964)。

在該方法的916行時,它會執行run()方法:

    def _bootstrap_inner(self):
            ...
            try:
                # 執行run
                self.run()
            except SystemExit:
                pass
            except:
              ...

如果此時你按照第二種新增子執行緒的方式,則直接會執行被子類TaskClass覆寫的run()方法。

如果是第一種新增子執行緒的方式,則還需要往裡面看(threading.py line 835 - 868):

 def run(self):

        try:
            # self._target = 我們自己傳遞的可呼叫物件task
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs

至此可以發現,不管是使用哪一種方式新增子執行緒,都會執行5個方法。

所以說它們內部實現其實都是一樣的,沒什麼特別的,也不要覺得它特別神奇。

threading模組方法大全

以下是threading模組提供的類或方法:

類方法 描述 返回值
threading.Thread(target, args, kwargs) 建立並返回一個執行緒物件 threadObject
threading.Timer(interval, function, args, kwargs) 建立並返回一個延遲啟動的執行緒物件 threadObject
threading.active_count() 獲取當前程序下存活的執行緒數量 int
threading.enumerate() 檢視當前程序存活了的所有執行緒物件,以列表形式返回 [threadObject, ...]
threading.main_thread() 獲取主執行緒物件 threadObject
threading.current_thread() 獲取當前正在執行的執行緒物件 threadObject
threading.currentThread() 獲取當前正在執行的執行緒物件 threadObject
threading.get_ident() 獲取當前正在執行的執行緒物件的編號 int

下面我將使用該程式碼對上述功能進行演示:

import threading
import time

class TaskClass(threading.Thread):

    def run(self):
        time.sleep(3)
        pass

if __name__ == "__main__":
    for i in range(3):
        subThreadIns = TaskClass()
        subThreadIns.start()

1)獲取當前程序下存活的執行緒數量:

print(threading.active_count())

# 4

2)檢視當前程序存活了的所有執行緒物件,以列表形式返回:

print(threading.enumerate())

# [<_MainThread(MainThread, started 4425459136)>, <TaskClass(Thread-1, started 123145449238528)>, <TaskClass(Thread-2, started 123145454493696)>, <TaskClass(Thread-3, started 123145459748864)>]

3)獲取主執行緒物件:

print(threading.main_thread())

# <_MainThread(MainThread, started 4565407168)>

4)獲取當前正在執行的執行緒物件:

print(threading.currentThread())

# <_MainThread(MainThread, started 4383299008)>

5)獲取當前正在執行的執行緒物件的編號:

print(threading.get_ident())

# 4380034496

threadObject方法大全

以下是針對執行緒物件提供的屬性或者方法:

方法/屬性 描述 返回值
threadObject.start() 通知系統該執行緒排程完畢,可以隨時進行啟動,一個執行緒物件只能執行一次該方法,若多次執行則丟擲RunTimeError異常 ...
threadObject.join(timeout=None) 主執行緒預設會等待子執行緒執行結束後再繼續執行,timeou為等待的秒數,如不設定該引數則一直等待。 ...
threadObject.getName() 獲取執行緒物件的名字 str
threadObject.setName(name) 設定執行緒物件的名字 None
threadObject.is_alive() 檢視執行緒物件是否存活 bool
threadObject.isAlive() 檢視執行緒物件是否存活,不推薦使用 bool
threadObject.isDaemon() 檢視執行緒物件是守護執行緒 bool
threadObject.setDaemon() 設定執行緒物件為守護執行緒,主執行緒執行完畢之後設定為守護執行緒的子執行緒便立即結束執行 None
threadObject.ident 獲取執行緒物件的編號 int
threadObject.name 獲取或者設定執行緒物件的名字 str or None
daemon 檢視執行緒物件是守護執行緒 bool

主執行緒阻塞

預設情況下,當子執行緒啟動後,主執行緒會依舊往下執行而不是等待所有的子執行緒執行完畢後再繼續往下執行。

如圖所示,主執行緒在執行結束後並不會被理解kill掉,而是所有的子執行緒執行完畢後主執行緒才會被kill掉:

我們可以利用threadObject.join(timeout=None)來讓主執行緒等待子執行緒執行完畢後再繼續向下執行,timeout為等待的秒數,如不設定該引數則一直等待。

如圖所示,這是沒有設定timeout的示意圖,主執行緒必須等待所有子執行緒執行完畢後再接著執行:

程式碼示例:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    print("main thread start run")
    threadLst = []
    
    for i in range(3):
        threadLst.append(TaskClass())
    for ins in threadLst:
        ins.start()  # 開始執行所有子執行緒
    for ins in threadLst:
        ins.join()   # 讓主執行緒等待所有子執行緒執行完畢後再接著執行,注意,設定主執行緒等待的子執行緒必須處於活躍狀態
        
    print("main thread carry on run")
    print("main thread run end")
    
# main thread start run
# Thread-1 start run
# Thread-2 start run
# Thread-3 start run
# Thread-1 run end
# Thread-2 run end
# Thread-3 run end
# main thread carry on run
# main thread run end

守護執行緒

守護執行緒是指當主執行緒執行完畢後,子執行緒是否還要繼續執行。

預設threadObject.setDaemon()為None,也就是False,即當前主執行緒執行完畢後,子執行緒依舊可以接著執行。

如果threadObject.setDaemon()為True,則當前主執行緒執行完畢後,子執行緒即使沒有執行完畢也會結束執行。

程式碼示例:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    print("main thread start run")
    threadLst = []
    for i in range(3):
        threadLst.append(TaskClass())
    for ins in threadLst:
    
        # 注意,守護執行緒的設定必須線上程未啟動時設定
        ins.setDaemon(True)
        ins.start()

    print("main thread carry on run")
    print("main thread run end")

# main thread start run
# Thread-1 start run
# Thread-2 start run
# Thread-3 start run
# main thread carry on run
# main thread run end

join()與setDaemon(True)共存

如果同時設定setDaemon(True)與join()方法會怎麼樣呢?有兩種情況:

  1. join()方法沒有設定timeout(沒有設定即表示死等)或者timeout的時間比子執行緒作業時間要長,這代表子執行緒會死在主執行緒之前,setDaemon(True)也就沒有了意義,即失效了
  2. join()設定了timeout並且timeout的時間比子執行緒作業時間要短,這代表主執行緒會死在子執行緒之前,setDaemon(True)生效,子執行緒會跟著主執行緒一起死亡。

情況一:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    subThread = TaskClass()
    subThread.setDaemon(True) # 主執行緒執行完後會立即終止子執行緒的執行。但是由於有join(),故不生效。
    subThread.start()
    subThread.join() # 主執行緒必須等待子執行緒執行結束後再接著執行
    print("main thread run end")

# Thread-1 start run
# Thread-1 run end
# main thread run end

情況2:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    subThread = TaskClass()
    subThread.setDaemon(True) # 主執行緒執行完後會立即終止子執行緒的執行。但是由於有join(),故不生效。
    subThread.start()
    subThread.join(1) # 主執行緒必須等待子執行緒執行結束後再接著執行,只等待1s
    print("main thread run end")

# Thread-1 start run
# main thread run end

執行緒延遲啟動

使用threading模組中提供的Timer類,可讓子執行緒延遲啟動,如下所示:

import threading
import time


def task():
    print("sub thread start run")
    time.sleep(3)
    print("sub thread run end")


if __name__ == "__main__":
    print("main thread run")
    t1 = threading.Timer(interval=3, function=task)
    t1.start()  # 3秒後才啟動子執行緒
    t1.join()
    print("main thread run end")

# main thread run
# sub thread start run
# sub thread run end
# main thread run end

如果要用類的形式,則可以繼承threading.Timer()類,並修改self.function屬性,個人極度不推薦這樣做。

如下所示,在不知道某一個方法怎麼使用時扒扒原始碼看一看,翻翻官方文件就大概能瞭解:

import threading
import time


class TaskClass(threading.Timer):
    def __init__(self, *args, **kwargs):
        # 必須要修改function為你想執行的方法
        super(__class__, self).__init__(*args, **kwargs)
        self.function = self.task

    def task(self, x, y):
        print("sub thread start run")
        time.sleep(3)
        print("parmas %s %s" % (x, y))
        print("sub thread run end")


if __name__ == "__main__":
    # 必須傳入一個None
    t1 = TaskClass(interval=3, function=None, args=(1, 2))
    t1.start()
    t1.join()
    print("main thread run end")

# sub thread start run
# parmas 1 2
# sub thread run end
# main thread run end

多執行緒程式設計應用場景

由於GIL鎖的存在,Python中對於I/O操作來說可以使用多執行緒程式設計,如果是計算密集型的操作則不應該使用多執行緒進行處理,因為沒有I/O操作就不能通過I/O切換來執行其他執行緒,故對於計算密集型的操作來說多執行緒沒有什麼優勢,甚至還可能比普通序列還慢(因為涉及到執行緒切換,雖然是毫秒級別,但是計算的數值越大這個切換也就越密集,GIL鎖是100個CPU指令切換一次的)

注意:我們是在Python2版本下進行此次測試,Python3版本確實相差不大,但是,從本質上來說依然是這樣的。

計算密集型程式的普通序列執行時間:

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000): # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    add()
    sub()

    end_time = time.time()
    print("執行時間:",end_time - start_time)
    
    
# ==== 執行結果 ==== 三次採集

"""
大約在 1.3 - 1.4 秒
"""

計算密集型程式的多執行緒併發執行時間:

# coding:utf-8

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000):  # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    t1 = threading.Thread(target=add,)
    t2 = threading.Thread(target=sub,)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    end_time = time.time()
    print(u"執行時間:",end_time - start_time)

# ==== 執行結果 ==== 三次採集

"""
大約 4 - 5 秒
"""