1. 程式人生 > 實用技巧 >【Python】多工

【Python】多工

博觀而約取,厚積而薄發。

1. 多工介紹

多工是指在同一時間內執行多個任務,例如: 現在電腦安裝的作業系統都是多工作業系統,可以同時執行著多個軟體。

1.1 多工的執行方式

  • 併發

指在一個時間段內,有多個程式在同一個cpu上執行,但是任意時刻只有一個程式在cpu上執行

例如:

對於單核cpu處理多工,不可能存在同一時刻有多個軟體同時執行,只不過是CPU執行時間劃分成若干個時間段交給不同的程式執行,他們之間是交替執行。但是由於cpu執行速度太快,所以我們感覺就像這些軟體都在同時執行一樣。

  • 並行

在任意時刻,有多個程式同時執行在多個cpu上

例如:

對於多核cpu處理多工,由於各個任務不存在爭奪cpu的執行許可權,也就保證了每個任務獨自佔用一個cpu核心,此時每個任務在任意時刻他們都是同時執行,不存在交替執行的過程。前提是在任務數小於cpu核數,但是無論任務數有多少,任意時刻不同cpu核心上的任務都是同時執行的。

  • 區別:

併發和並行是即相似又有區別的兩個概念,並行是指兩個或者多個事件在同一時刻發生;而併發是指兩個或多個事件在同一時間間隔內發生。在多道程式環境下,併發性是指在一段時間內巨集觀上有多個程式在同時執行,但在單處理機系統中,每一時刻卻僅能有一道程式執行,故微觀上這些程式只能是分時地交替執行。倘若在計算機系統中有多個處理機,則這些可以併發執行的程式便可被分配到多個處理機上,實現並行執行,即利用每個處理機來處理一個可併發執行的程式,這樣,多個程式便可以同時執行。

2. 程序

在Python程式中,想要實現多工可以使用程序來完成,程序是實現多工的一種方式。一個正在執行的程式或者軟體就是一個程序,它是作業系統進行資源分配的基本單位

,也就是說每啟動一個程序,作業系統都會給其分配一定的執行資源(記憶體資源)保證程序的執行。

注意:

一個程式執行後至少有一個程序,一個程序預設有一個執行緒稱為主執行緒,程序裡面可以建立多個執行緒,執行緒是依附在程序裡面的,沒有程序就沒有執行緒

2.1多程序使用

方式--fork

標準庫模組os中提供一個函式fork(),用於將當前程序複製一份子程序,而後父程序和子程序從呼叫fork()處開始分叉,兵分兩路,繼續並行執行後面的程式與普通函式不同的是,函式fork()會返回兩次,分別在父程序和子程序內返回,返回值分為三種情況:

  1. 返回值小於0,表示複製子程序失敗
  2. 返回值等於0,表示處於在子程序中
  3. 返回值大於0,表示處於父程序中,返回值就是子程序的id

windows作業系統中無法呼叫函式fork()

import os

try:
    pid = os.fork()
except OSError:
    print("作業系統不支援")
    exit()
if pid < 0:
    print("複製子程序失敗")
elif pid == 0:
    print("我是子程序%d,我的父程序是%d" % (os.getpid(), os.getppid()))
else:
    print("我是父程序%d,我的子程序是%d" % (os.getpid(), pid))

# 我是父程序30007,我的子程序是30009
# 我是子程序30009,我的父程序是30007

方式一:

標準庫模組multiprocessing提供了一個類物件Process,用於表示程序

  1. 根據類物件Process建立程序例項物件

  2. 呼叫程序例項物件的方法start()啟動程序,呼叫start()後,會自動呼叫run(),方法run()會自動呼叫target指定的函式

from multiprocessing import Process, current_process

print("父程序啟動(%d----%s)" % (current_process().pid, current_process().name))

def do_something(*args):
    print("子程序啟動(%d----%s)" % (current_process().pid, current_process().name))
    print(args)
    print("子程序結束(%d----%s)" % (current_process().pid, current_process().name))

process = Process(target=do_something, args=(5, 10))
process.start()

import time

time.sleep(2)
print("父程序結束(%d----%s)" % (current_process().pid, current_process().name))

Process的__init__方法:

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},*, daemon=None):
  • group:用於指定程序例項物件所屬的程序組,預設不屬於任何程序組
  • target:用於制定被run()呼叫的函式,預設沒有函式被呼叫
  • name:用於制定建立程序例項物件的名稱,第n個子程序的預設名稱為Process-n
  • args:用於制定target接受的未知引數,用元組表示,預設不接受位置引數
  • kwargs:用於制定target接受的關鍵值引數,用字典表示,預設不接受關鍵字引數
  • daemon:守護程序

方式二:

  1. 自定義繼承自Process的類物件,重寫特殊方法__init__()run()
  2. 根據自定義的類物件建立程序例項物件
  3. 呼叫程序例項物件的方法start()啟動程序,呼叫start()後,會自動呼叫重寫的run()
from multiprocessing import Process, current_process

print("父程序啟動(%d----%s)" % (current_process().pid, current_process().name))

class MyProcess(Process):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子程序啟動(%d----%s)" % (current_process().pid, current_process().name))
        print(self.args)
        print("子程序結束(%d----%s)" % (current_process().pid, current_process().name))

m = MyProcess(name="MyProcess", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父程序結束(%d----%s)" % (current_process().pid, current_process().name))

2.2 多程序執行的不確定性

from multiprocessing import Process, current_process

import time

def do_something():
    for i in range(4):
        print("%s:%d" % (current_process().name, i))
        time.sleep(2)

for i in range(3):
    Process(target=do_something).start()

do_something()

執行結果:

MainProcess:0
Process-1:0
Process-2:0
Process-3:0
MainProcess:1
Process-1:1
Process-2:1
Process-3:1
Process-1:2
MainProcess:2
Process-2:2
Process-3:2
Process-1:3
MainProcess:3
Process-2:3
Process-3:3

本次測試程式啟動了三個子程序去執行do_something方法,之後主程序也執行該方法,通過執行結果可以看出,每個程序之間執行是沒有順序。預設情況下,多個程序的執行順序和時間都是不確定的,完全取決於作業系統的排程

2.3 守護程序

import time
from multiprocessing import Process, current_process

print("父程序%d啟動" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子程序%d啟動" % current_process().pid)
        time.sleep(2)
        print("子程序%d結束" % current_process().pid)

m = MyProcess()
# m = MyProcess(daemon=True)
m.daemon = True
m.start()

print("父程序%d結束" % current_process().pid)

>>>父程序13717啟動
>>>子程序13721啟動
>>>父程序13717結束

可以在呼叫程序例項物件方法的start()之前,將屬性daemon的值設定為True,或直接通過建立物件傳入關鍵字引數,從而將程序設定為守護程序。守護程序是為了守護父程序而存在的子程序,當父程序結束時,守護程序就沒有了存在的意義,因此,守護程序會隨著父程序的結束而結束

2.4 阻塞父程序的子程序方法join

比如:現在有個需求,需要在子程序的程式執行完成以後接著主程序繼續執行,但是我們不知道子程序需要執行多長時間,此時就需要使用join方法。他能保證無論子程序執行多久,只有在子程序執行完畢以後,才會執行join方法之後的程式,此時方法join就處於阻塞狀態。

import time
from multiprocessing import Process, current_process

print("父程序%d啟動" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子程序%d啟動" % current_process().pid)
        time.sleep(2)
        print("子程序%d結束" % current_process().pid)

m = MyProcess()
m.start()
# m.join()
m.join(1)
print("父程序%d結束" % current_process().pid)

>>>父程序13921啟動
>>>子程序13926啟動
>>>父程序13921結束
>>>子程序13926結束

在父程序中建立並啟動子程序後,可以呼叫子程序的方法join(),這樣子程序會把父程序阻塞,父程序會等子程序執行完之後再從被阻塞的地方繼續執行

在呼叫join()時,可以制指定引數timeout,從而指定子程序阻塞父程序的時間

2.5 全域性變數多個程序不共享

from multiprocessing import Process

NUM = 18

def do_something():
    global NUM
    NUM += 1

p = Process(target=do_something)
p.start()
p.join()

# 在子程序中修改全域性變數,對父程序中的全域性變數沒有影響
# 因為:子程序對父程序中的全域性變數做了一份拷貝,子程序與父程序中的全域性變數NUM是完全不同的兩個變數
print(NUM)  # 18

每個程序都有獨立的記憶體空間,從而程序間是相互獨立的,因此,全域性變數在多個程序之間不能共享。

2.6 多程序操作共享資料是不安全的

from multiprocessing import Process, Value

NUM = Value('i', 0)

def do_something():
    global NUM
    for i in range(10000):
        # 相當於:num = num +1
        # 首先計算num+1,存入臨時變數中,然後將臨時變數的值賦給NUM
        NUM.value += 1

p1 = Process(target=do_something)
p2 = Process(target=do_something)

p1.start()
p2.start()
p1.join()
p2.join()

print(NUM.value)  # 小於20000

由於多程序的執行是不確定的,導致多程序操作共享資料的結果是不可預期的,這也常被稱為不安全的

3. 執行緒

在Python中,想要實現多工除了使用程序,還可以使用執行緒來完成,執行緒是實現多工的另外一種方式。執行緒是程序中執行程式碼的一個分支,每個執行分支(執行緒)要想工作執行程式碼需要cpu進行排程 ,也就是說執行緒是cpu排程的基本單位,每個程序至少都有一個執行緒,而這個執行緒就是我們通常說的主執行緒。

3.1 多執行緒使用

任何程序都會自動建立並啟動一個執行緒,該執行緒被稱為父(主)執行緒,預設名稱是MainThread通過父程序建立的子程序也會自動建立一個執行緒

方式一:

標準庫模組threading提供了一個類物件Thread,用於表示執行緒

  1. 根據類物件Thread建立執行緒例項物件

  2. 呼叫執行緒例項物件的方法start()啟動執行緒,呼叫start()後,會自動呼叫run(),方法run()會自動呼叫target指定的函式

from threading import Thread, currentThread

print("父執行緒啟動(%d----%s)" % (currentThread().ident, currentThread().name))


def do_something(*args):
    print("子執行緒啟動(%d----%s)" % (currentThread().ident, currentThread().name))
    print(args)
    print("子執行緒結束(%d----%s)" % (currentThread().ident, currentThread().name))


thread = Thread(target=do_something, args=(5, 10), name="MyThread")
thread.start()

import time

time.sleep(2)
print("父執行緒結束(%d----%s)" % (currentThread().ident, currentThread().name))

# 執行結果:
>>>父執行緒啟動(140450223359808----MainThread)
>>>子執行緒啟動(140450214668032----MyThread)
>>>(5, 10)
>>>子執行緒結束(140450214668032----MyThread)
>>>父執行緒結束(140450223359808----MainThread)

Thread的__init__方法:

def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
  • group:用於指定執行緒例項物件所屬的執行緒組,預設不屬於任何執行緒組
  • target:用於制定被run()呼叫的函式,預設沒有函式被呼叫
  • name:用於制定建立執行緒例項物件的名稱,第n個子執行緒的預設名稱為Thread-n
  • args:用於制定target接受的未知引數,用元組表示,預設不接受位置引數
  • kwargs:用於制定target接受的關鍵值引數,用字典表示,預設不接受關鍵字引數
  • daemon:用於制定執行緒例項物件是否是守護執行緒,預設不是守護執行緒

方式二:

  1. 自定義繼承自Thread的類物件,重寫特殊方法__init__()run()
  2. 根據自定義的類物件建立執行緒例項物件
  3. 呼叫執行緒例項物件的方法start()啟動執行緒,呼叫start()後,會自動呼叫重寫的run()
from threading import Thread, current_thread

print("父執行緒啟動(%d----%s)" % (current_thread().ident, current_thread().name))

class MyThread(Thread):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子執行緒啟動(%d----%s)" % (current_thread().ident, current_thread().name))
        print(self.args)
        print("子執行緒結束(%d----%s)" % (current_thread().ident, current_thread().name))

m = MyThread(name="MyThread", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父執行緒結束(%d----%s)" % (current_thread().ident, current_thread().name))


# 執行結果:
>>>父執行緒啟動(140450223359808----MainThread)
>>>子執行緒啟動(140450214668032----MyThread)
>>>(5, 10)
>>>子執行緒結束(140450214668032----MyThread)
>>>父執行緒結束(140450223359808----MainThread)

3.2 多執行緒執行的不確定性

同多程序。預設情況下,多個執行緒的執行順序和時間都是不確定的,完全取決於作業系統的排程

from threading import Thread, current_thread

import time

def do_something():
    for i in range(5):
        print("%s:%d" % (current_thread().name, i))
        time.sleep(2)

for i in range(3):
    Thread(target=do_something).start()

do_something()

執行結果:

Thread-1:0
Thread-2:0
Thread-3:0
MainThread:0
Thread-2:1
Thread-3:1
MainThread:1
Thread-1:1
Thread-2:2
Thread-3:2
MainThread:2
Thread-1:2
Thread-2:3
Thread-3:3
Thread-1:3
MainThread:3

3.3 守護執行緒

在建立執行緒例項物件時可以將引數daemon的值設定為True,從而將執行緒設定為守護執行緒,守護執行緒是為了守護父執行緒而存在的子執行緒,當父執行緒結束時,守護執行緒就沒有了存在的意義,因此,守護執行緒會隨著父執行緒的結束而結束

import time
from threading import Thread, current_thread

print("父執行緒%d啟動" % current_thread().ident)

class MyThread(Thread):

    def run(self):
        print("子執行緒%d啟動" % current_thread().ident)
        time.sleep(2)
        print("子執行緒%d結束" % current_thread().ident)

# m = MyThread(daemon=True)
m = MyThread()
m.setDaemon(True)
# m.daemon = True
m.start()

time.sleep(1)

print("父執行緒%d結束" % current_thread().ident)

3.4 阻塞父執行緒的子執行緒方法join

在父執行緒中建立並啟動子執行緒後,可以呼叫子執行緒的方法join(),這樣子執行緒會把父執行緒阻塞,父執行緒會等子執行緒執行完之後再從被阻塞的地方繼續執行在呼叫join()時,可以制指定引數timeout,從而指定子執行緒阻塞父執行緒的時間

import time
from threading import Thread, current_thread

print("父執行緒%d啟動" % current_thread().ident)

class MyThread(Thread):
    def run(self):
        print("子執行緒%d啟動" % current_thread().ident)
        time.sleep(2)
        print("子執行緒%d結束" % current_thread().ident)

m = MyThread()
m.start()
# m.join()
m.join(1)
print("父執行緒%d結束" % current_thread().ident)

3.5 全域性變數在程序的所有執行緒中可以共享

from threading import Thread

NUM = 18

def do_something():
    global NUM
    NUM += 1

t = Thread(target=do_something)
t.start()
t.join()

print(NUM)  # 19

程序內的所有執行緒共享記憶體空間,所以全域性變數在程序的所有執行緒中可以共享

3.6 多執行緒操作共享資料是不安全的

from threading import Thread
import dis

NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        # 相當於:num = num +1
        # 首先計算num+1,存入臨時變數中,然後將臨時變數的值賦給NUM
        NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 小於2000000

由於多執行緒的執行是不確定的,導致多執行緒操作共享資料的結果是不可預期的,這也常被稱為不安全的

4. 程序池

4.1 程序池Pool

如果併發的任務數過多,一次性建立並啟動大量的程序會給計算機帶來很大的壓力,那麼就可以使用程序池對建立與啟動的程序進行限制和管理程序池中能所容納的程序數目是固定的。

標準庫模組multiprocessing中提供一個類物件Pool,用於表示程序池,程序池中所能容納的程序數目可以在建立Pool例項物件時程序指定,如果不指定,預設大小是CPU的核數

from multiprocessing import Pool
import time, random

print("父程序啟動")

def do_something(i):
    print("子程序%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子程序%d結束,耗時%.2f秒" % (i, end - start))

p = Pool(3) # 將程序池所能容納的最大程序數指定為3

for i in range(1, 11):
    # 與start()類似,不同的是,建立並啟動有程序池管理的子程序
    p.apply_async(do_something, args=(i,))

# 呼叫方法join()之前,必須呼叫close()
# 呼叫close()之後就不能讓程序池在管理新的程序了
p.close()

# 父程序被阻塞
# 程序池管理的所有子程序執行完之後,父程序再從被阻塞的地方繼續執行
p.join()

print("父程序結束")

# 程式執行後同時建立並執行3個子程序,第四個子程序要等前面三個中的某一個執行完畢之後,才會建立並啟動

4.2 ProcessPoolExecutor

在標準庫模組concurrent.futures中提供了一個類物件ProcessPoolExecutor,也用於表示程序池。與Pool相比,ProcessPoolExecutor的功能和效能更加強大。

from concurrent.futures import ProcessPoolExecutor
import time, random

print("父程序啟動")

def do_something(i):
    print("子程序%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子程序%d結束,耗時%.2f秒" % (i, end - start))

# 將程序池所能容納的最大程序數指定為3
p = ProcessPoolExecutor(max_workers=3)

# 將需要程序池處理的任務全部交給程序池,此後會建立並啟動由程序池管理的子程序
for i in range(1, 11):
    p.submit(do_something, i)

# 父程序被阻塞
# 程序池管理的所有子程序執行完之後,父程序再從被阻塞的地方繼續執行
p.shutdown(wait=True)

print("父程序結束")

ProcessPoolExecutor的父類物件Executor遵守了上下文管理解析,所以可以使用with語句,這樣在離開執行時上下文是會自動呼叫shutdown(wait=True)方法

原始碼 concurrent.futures._base.py 下面的類物件Executor

def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
with ProcessPoolExecutor(max_workers=3) as p:
    """
    for i in range(1, 11):
        p.submit(do_something, i)
    """
    # 等價於:
    p.map(do_something, range(1, 11))

4.3 ProcessPoolExecutor物件方法

  1. submit(self, fn, *args, **kwargs):

返回值是一個Future例項物件,表示子程序所呼叫的那個函式的執行,比如:do_something(),可以呼叫Futureresult()得到這個函式的返回值。方法result()是一個同步方法,直到這個函式執行完畢之後方法result()才會返回

with ProcessPoolExecutor(max_workers=5) as p:
     for i in range(1, 11):
        future = p.submit(do_something, i)
        # 同步,需要等待do_something執行完畢
        print(future.result())

p.submit(do_something, i)是立刻返回,可以通過列表儲存返回的物件,之後遍歷獲取返回值

with ProcessPoolExecutor(max_workers=3) as p:
    obj_list = []
    for i in range(1, 11):
        future = p.submit(do_something, i)
        # 非同步,無需等待do_something執行完畢
        print(future)
        obj_list.append(future)

for obj in obj_list:
    print(obj.result())
  1. wait(fs, timeout=None, return_when=ALL_COMPLETED):

該函式用於阻塞父程序,以等待指定的Future例項物件序列,直到滿足條件

  • fs : 用於指定要等待的Future物件例項序列

  • timeout : 用於指定等待的最長時間,如果指定為None或不指定,則一直等待

  • return_when : 用於指定這個函式何時反覆,有三種取值:

    • FIRST_COMPLETED :當第一個Future例項物件已經完成或取消時

    • FIRST_EXCEPTION :當第一個Future例項物件丟擲異常時

    • ALL_COMPLETED :當所有Future例項物件已經完成或已被取消時

該函式的返回值是有兩個集合組成的元組,第一個集合包含了已經完成或已被取消的所有Future例項物件,第二個集合包含了沒有完成並且沒有被取消的Future例項物件

from concurrent.futures import ProcessPoolExecutor, wait, as_completed, ALL_COMPLETED, FIRST_COMPLETED

import time, random

def do_something(i):
    time.sleep(random.random() * 10)
    return i * i

p = ProcessPoolExecutor(max_workers=3)
objs = []

for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)
    
(done, not_done) = wait(objs, return_when=ALL_COMPLETED)
print(done)  # 已經完成的Future物件例項的集合
print(not_done)  # 未完成的Future物件例項的集合
  1. as_completed(fs, timeout=None):

該函式用於將指定的Future例項物件序列轉換為一個迭代器,當序列中的任意一個Future例項物件已經完成或已被取消時都會被yield。這樣,通過遍歷得到的迭代器,就可以在任意一個Future例項物件已經完成或已被取消時立即做一些處理,比如:呼叫result()方法得到執行結果

  • fs : 用於指定Future例項物件的序列
  • timeout : 用於指定超時時間,如果指定為None或不指定,則不會超時
for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)

future_iterator = as_completed(objs)
for future in future_iterator:
    print(future.done())
    print(future.cancel())
    print(future.result())

# executor.done() # 檢視任務是否完成(True/False)
# executor.cancel() # 取消等待的任務,在執行中或執行完成的任務無法取消
# executor.result() # 獲取任務函式返回的結果

5. 執行緒池

5.1 執行緒池ThreadPool

第三方庫threadpool中提供一個類物件ThreadPool,用於表示執行緒池,執行緒池中所能容納的執行緒數目可以在建立ThreadPool例項物件時執行緒指定,如果不指定,預設大小是CPU的核數

from threadpool import ThreadPool, makeRequests
import time, random

print("父執行緒啟動")

def do_something(i):
    print("子執行緒%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子執行緒%d結束,耗時%.2f秒" % (i, end - start))

t = ThreadPool(3)

# 建立需要執行緒池處理的任務
requests = makeRequests(do_something, args_list=range(1, 11))

# 將需要執行緒池處理的任務全部交給執行緒池,此後會建立並啟動執行緒由執行緒池管理的子執行緒
for req in requests:
    t.putRequest(req)

# 父執行緒被阻塞
# 執行緒池管理的所有子執行緒執行完之後,互相陳再從阻塞的地方繼續執行
t.wait()

print("父執行緒結束")

# 程式執行後同時建立並執行3個子執行緒,第四個子執行緒要等前面三個中的某一個執行完畢之後,才會建立並啟動

5.2 ThreadPoolExecutor

與程序池ProcessPoolExecutor用法一致

6. 同步(適用於程序和執行緒)

為了保證多個執行緒(程序)能安全的操作共享資料,必須確保一個執行緒(程序)在操作共享資料時,其他執行緒(程序)都不能操作。

一個執行緒(程序)A在操作共享資料前必須先試圖獲得鎖,從而給相關程式碼上鎖,執行緒A獲得鎖之後,鎖的狀態變成為locked。如果另外一個執行緒(程序)B試圖獲得鎖,執行緒(程序)B的狀態會變成為blocked並且被新增到鎖等待池,只能等待獲得鎖的執行緒(程序)A在釋放鎖之後,鎖的狀態變為unlocked,執行緒(程序)排程程式再從鎖等待池中處於狀態blocked的執行緒(程序)中選擇一個來獲得鎖,獲的鎖之後該執行緒(程序)的狀態變為running。由於只有一把鎖,無論多少個執行緒(程序),同一時刻最多隻有一個執行緒(程序)能獲得該鎖,這樣就確保了操作共享資料的相關程式碼只能有一個執行緒(程序)從頭到尾完成的執行,從而確保了多個執行緒(程序)操作共享資料總是安全的。但是包含鎖的相關程式碼只能以單執行緒模式執行,因此效率大大降低了。

6.1 互斥鎖Lock

  • 多執行緒同步之Lock

標準庫模組threading中提供了一個類物件Lock,用於表示鎖,以實現多執行緒之間的同步簡單的說:同步就意味著”阻塞和等待“。為了保證獲得鎖的執行緒(程序)用完了後一定要釋放鎖,可以將操作共享資料的程式碼放在try語句塊中,把釋放鎖的程式碼放在finally語句塊。由於類物件Lock遵循了上下文管理協議,所以可以使用with語句進行簡化,這樣,在進入執行時上下文是會自動呼叫方法require(),在離開執行時上下文是會自動呼叫release()

from threading import Thread, Lock

lock = Lock() # 定義鎖,預設狀態是解鎖
NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        """
        lock.acquire() # 上鎖
        try:
            NUM += 1
        finally:
            lock.release() # 釋放鎖(解鎖)
        """
        with lock:
            NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 2000000
  • 多程序同步之Lock

標準庫模組multiprocessing中提供了一個類物件Lock,用於表示鎖,以實現多程序之間的同步.

from multiprocessing import Process, Lock, Value

lock = Lock()
NUM = Value('i', 0)

def do_something():
    global NUM
    for i in range(1000000):
        """
        lock.acquire()
        try:
            NUM += 1
        finally:
            lock.release()
        """
        with lock:
            NUM.value += 1

p1 = Process(target=do_something)
p2 = Process(target=do_something)

p1.start()
p2.start()
p1.join()
p2.join()

print(NUM.value)  # 2000000

6.2 死鎖

6.2.1 執行緒死鎖

如果有多個共享資料,在多個執行緒操作這多個共享資料時,如果兩個執行緒分別通過加鎖佔有一部分共享資料,並且同時等待對方釋放鎖,這樣就會導致兩個執行緒永遠相互等待而產生死鎖。要避免程式中出現死鎖的情況,在避免死鎖的演算法中最有代表性的演算法是Dijkstra提出的銀行家演算法

from threading import Thread, Lock, current_thread

numa = 0
numb = 0
locka = Lock()
lockb = Lock()

def do_something():
    fun1()
    fun2()

def fun1():
    global numa, numb
    locka.acquire()

    try:
        print("%s------func1()-------locka" % current_thread().ident)
        numa += 1
        lockb.acquire()
        try:
            print("%s------func1()-------lockb" % current_thread().ident)
            numb += 1
        finally:
            lockb.release()
    finally:
        locka.release()

def fun2():
    global numa, numb
    lockb.acquire()
    try:
        print("%s------func2()-------lockb" % current_thread().ident)
        numb += 1
        locka.acquire()
        try:
            print("%s------func2()-------locka" % current_thread().ident)
            numb += 1
        finally:
            locka.release()
    finally:
        lockb.release()

t_list = []
for i in range(100):
    t = Thread(target=do_something)
    t_list.append(t)
    t.start()

for item in t_list:
    item.join()

print("父執行緒結束")

執行緒一執行func1(),優先獲得locka的鎖,此時執行緒二執行func2(),優先獲得lockb鎖,之後方法func1()需要獲取lockb鎖,方法func2()需要獲取locka鎖,也就是各個執行緒都需要另外一個執行緒的資源,但是在獲取此前他們各自都沒有解鎖,也就導致他們都沒法上鎖,始終保持僵持的狀態。這一種狀態就是死鎖。

6.2.2 程序死鎖

程序通過共享記憶體訪問全域性變數,同樣可以利用上述測試程式碼

from multiprocessing import Process, Lock, current_process,Value

numa = Value('i',0)

numb = Value('i',0)

locka = Lock()
lockb = Lock()

......

6.3 RLock

在同一執行緒中,當呼叫了Lock的方法acquire()之後,如果在呼叫方法release()之前再次呼叫了方法acquire(),也會導致死鎖

from threading import Thread, current_thread, RLock, Lock

lock = Lock()
lock.acquire()
print("獲得鎖")

lock.acquire()
print("獲得鎖")

lock.release()
print("釋放鎖")

lock.release()
print("釋放鎖")

>>>獲得鎖
>>>

標準庫模組threading中還提供了一個用於表示鎖的類物件RLock(可重入鎖)。與Lock相同的是:RLock也提供了獲得鎖的方法acquire(),和釋放鎖的方法release()Lock不同的是:在同一個執行緒中,當呼叫了RLock的方法acquire()之後。可以再呼叫方法release()之前可以多次呼叫方法acquire()而不會導致死鎖。

lock = RLock()

lock.acquire()
print("獲得鎖")
lock.acquire()
print("獲得鎖")
lock.release()
print("釋放鎖")
lock.release()
print("釋放鎖")

>>>獲得鎖
>>>獲得鎖
>>>釋放鎖
>>>釋放鎖

本質:

pass

6.4 Condition

標準庫模組threading中提供了一個類物件Condition,用於表示帶觸發條件的鎖,以幫助我們處理多執行緒間複雜的同步問題,Condition允許一個或多個執行緒等待觸發條件,直到收到另一個執行緒的通知

from threading import Thread, Condition
import time

cond = Condition()

class MyThread1(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        cond.acquire()

        print("%s:1" % self.name)
        cond.notify()
        cond.wait()

        # 思考兩秒再說
        time.sleep(2)
        print("%s:11" % self.name)
        cond.notify()
        cond.wait()

        # 思考兩秒再說
        time.sleep(2)
        print("%s:111" % self.name)
        cond.notify()
        cond.release()

class MyThread2(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        time.sleep(1)
        cond.acquire()

        # 思考兩秒再說
        time.sleep(2)
        print('%s:2' % self.name)
        cond.notify()
        cond.wait()

        time.sleep(2)
        print('%s:22' % self.name)
        cond.notify()
        cond.wait()

        time.sleep(2)
        print('%s:222' % self.name)
        cond.release()

MyThread1("Thread1").start()
MyThread2("Thread2").start()

>>>Thread1:1
>>>Thread2:2
>>>Thread1:11
>>>Thread2:22
>>>Thread1:111
>>>Thread2:222

本質:

pass

6.5 生產者消費者(多執行緒Condition)

生產者消費者問題:假設和有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來交換產品。

  • 生產者策略:如果市場剩餘產品小於20個,那麼就生產4個產品放到市場
  • 消費者策略:如果市場上剩餘產品多於10個,那麼就從市場上消費3個產品
from threading import Thread, Condition
import time

cond = Condition()
count = 0

class Producer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count < 20:
                count += 4
                print("%s:生產者生產了4個,當前總共%d個" % (self.name, count))
                cond.notify()
            else:
                print("%s:不生產,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

class Consumer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count > 10:
                count -= 3
                print("%s:消費者消費了3個,當前總共%d個" % (self.name, count))
                cond.notify()
            else:
                print("%s:不消費,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

for i in range(3):
    Producer().start()

for i in range(3):
    Consumer().start()

6.6 Semaphore(訊號量)

標準庫模組threading中提供了類物件Semaphore,用於表示訊號量,他可以幫助我們控制併發執行緒的最大數量從而實現多執行緒之間的同步

from threading import Semaphore, Thread
import time, random

sem = Semaphore(3)

class MyThread(Thread):
    def run(self):
        """
        sem.acquire()
        print("%s獲得資源" % self.name)
        time.sleep(random.random() * 10)
        sem.release()
        """
        with sem:
            print("%s獲得資源" % self.name)
            time.sleep(random.random() * 10)

for i in range(10):
    MyThread().start()

6.7 Event

類物件Event也可以實現執行緒間同步,Event例項物件管理著一個內部標誌,通過改變這個內部標誌的值可以讓一個執行緒給其他處於阻塞狀態的執行緒傳送一個事件訊號。從而喚醒這些執行緒讓他們轉換為執行好狀態

  1. set(self):將內部標誌設定為True
  2. is_set(self):將判斷內部標誌時候給設定為True
  3. clear(self):將內部標誌設定為False
  4. wait(self,timeout=None):但內部標誌為False時,呼叫該方法的執行緒會阻塞。直到另外一個執行緒呼叫了set()將內部標誌設定為True,被阻塞的執行緒才會轉為執行狀態。
from threading import Thread, Event, current_thread

import time

event = Event()
print(event.is_set())  # False

def do_something():
    print("%s開始等待" % current_thread().name)
    event.wait()
    print("%s結束等待" % current_thread().name)

for i in range(3):
    Thread(target=do_something).start()

time.sleep(2)
event.set()

7. 程序間通訊

7.1 共享記憶體

如果想要實現程序之間的通訊,共享記憶體是常見的實現方式之一,它允許多個程序直接訪問同一塊記憶體

共享記憶體中的物件的型別必須是ctypes的,ctypes是與C語言相容的資料型別。為了在共享記憶體中建立ctypes型別的物件,標準庫模組multiprocessing提供了以下兩個類:

(1) Value(typecode_or_type,*args,**kwargs):

  • typecode_or_type : 用於指定數值的型別碼或ctypes型別

(2) Array(typecode_or_type,size_or_initializer,lock=True):    

  • typecode_or_type : 用於指定數值的型別碼或ctypes型別
  • size_or_initializer : 用於指定陣列的長度或Python中的序列
from multiprocessing import Value, Array, Process
import ctypes

# 在共享記憶體中建立一個表示數值的ctypes物件
num = Value('d', 2.3)
# num = Value(ctypes.c_float,2.3)

# 在共享記憶體中建立一個表示陣列的ctypes物件
arr = Array('i', range(1, 5))

# arr = Array(ctypes.c_int, range(1, 5))

def do_something():
    num.value = 1.8
    for i in range(len(arr)):
        arr[i] = -arr[i]

p = Process(target=do_something)
p.start()
p.join()

print(num.value)  # 1.8
print(arr[:])  # [-1, -2, -3, -4]

7.2 管道

函式Pipe(),其返回值是一個元組,元組中包含兩個物件,分別表示管道兩端的連線。

呼叫Pipe()時,如果不傳入引數或傳入的引數為True,管道的工作物件為雙向如果傳入的引數為False,管道的工作方式為單向,其中對於返回的元組,第一個連線物件只能接受資料,第二個連線物件只能傳送資料對於管道兩端的連線物件,主要有兩個方法:

  1. send(self,obj):用於將引數obj指定的物件傳送到管道
  2. recv(self):用於從管道接收物件
from multiprocessing import Pipe

conn1, conn2 = Pipe()  # 分別表示兩端的管道連線,引數為False表示單向,為True表示為雙向(預設)

conn1.send('conn1傳送資料1')
conn1.send('conn1傳送資料2')

conn2.send('conn2傳送資料1') 
conn2.send('conn2傳送資料2')

print(conn1.recv()) # conn2傳送資料1
print(conn1.recv()) # conn2傳送資料2
print(conn2.recv()) # conn1傳送資料1
print(conn2.recv()) # conn1傳送資料2

c1, c2 = Pipe(False)

# c1.send("c1")  # OSError: connection is read-only c1只能接收,不能傳送
# print(c2.recv())

c2.send("c2")
print(c1.recv()) # c2

7.3 Manager

如果想要實現程序之間的通訊,Manager也是常見的實現方式之一與共享記憶體相比,Manager更加靈活,因為它可以支援多種物件型別,此外,Manager還可以通過網路被不同計算機上的程序所共享。但是Manager的速度要比共享記憶體慢

from multiprocessing import Manager, Process

def func():
    d[1] = 18
    d['2'] = 56
    l.reverse()

manager = Manager()

# 通過manager建立了一個同於程序通訊的字典
d = manager.dict()

# 通過manager建立了一個同於程序通訊的列表
l = manager.list(range(5))
p = Process(target=func) # 子程序執行

p.start()
p.join()

# 主程序檢視資料時候被修改
print(d) # {1: 18, '2': 56}
print(l) # [4, 3, 2, 1, 0]

8. 定時器執行緒

如果想要在指定的時間片段之後再啟動子執行緒,可以使用標準庫模組threading提供的類物件Timer,用於表示定時器執行緒,Timer是Thread的子類,也可以通過方法start()來啟動執行緒。定時器只執行一次。如果需要每個一段時間執行一次,則需要在子執行緒呼叫的函式內部再次建立啟動子執行緒

from threading import Timer

def do_something():
    print("do something")
    
timer = Timer(2, do_something)
timer.start()

timer.cancel()  # 取消定時器

9. ThreadLocal

博觀而約取,厚積而薄發。

1. 多工介紹

多工是指在同一時間內執行多個任務,例如: 現在電腦安裝的作業系統都是多工作業系統,可以同時執行著多個軟體。

1.1 多工的執行方式

  • 併發

指在一個時間段內,有多個程式在同一個cpu上執行,但是任意時刻只有一個程式在cpu上執行

例如:

對於單核cpu處理多工,不可能存在同一時刻有多個軟體同時執行,只不過是CPU執行時間劃分成若干個時間段交給不同的程式執行,他們之間是交替執行。但是由於cpu執行速度太快,所以我們感覺就像這些軟體都在同時執行一樣。

  • 並行

在任意時刻,有多個程式同時執行在多個cpu上

例如:

對於多核cpu處理多工,由於各個任務不存在爭奪cpu的執行許可權,也就保證了每個任務獨自佔用一個cpu核心,此時每個任務在任意時刻他們都是同時執行,不存在交替執行的過程。前提是在任務數小於cpu核數,但是無論任務數有多少,任意時刻不同cpu核心上的任務都是同時執行的。

  • 區別:

併發和並行是即相似又有區別的兩個概念,並行是指兩個或者多個事件在同一時刻發生;而併發是指兩個或多個事件在同一時間間隔內發生。在多道程式環境下,併發性是指在一段時間內巨集觀上有多個程式在同時執行,但在單處理機系統中,每一時刻卻僅能有一道程式執行,故微觀上這些程式只能是分時地交替執行。倘若在計算機系統中有多個處理機,則這些可以併發執行的程式便可被分配到多個處理機上,實現並行執行,即利用每個處理機來處理一個可併發執行的程式,這樣,多個程式便可以同時執行。

2. 程序

在Python程式中,想要實現多工可以使用程序來完成,程序是實現多工的一種方式。一個正在執行的程式或者軟體就是一個程序,它是作業系統進行資源分配的基本單位,也就是說每啟動一個程序,作業系統都會給其分配一定的執行資源(記憶體資源)保證程序的執行。

注意:

一個程式執行後至少有一個程序,一個程序預設有一個執行緒稱為主執行緒,程序裡面可以建立多個執行緒,執行緒是依附在程序裡面的,沒有程序就沒有執行緒

2.1多程序使用

方式--fork

標準庫模組os中提供一個函式fork(),用於將當前程序複製一份子程序,而後父程序和子程序從呼叫fork()處開始分叉,兵分兩路,繼續並行執行後面的程式與普通函式不同的是,函式fork()會返回兩次,分別在父程序和子程序內返回,返回值分為三種情況:

  1. 返回值小於0,表示複製子程序失敗
  2. 返回值等於0,表示處於在子程序中
  3. 返回值大於0,表示處於父程序中,返回值就是子程序的id

windows作業系統中無法呼叫函式fork()

import os

try:
    pid = os.fork()
except OSError:
    print("作業系統不支援")
    exit()
if pid < 0:
    print("複製子程序失敗")
elif pid == 0:
    print("我是子程序%d,我的父程序是%d" % (os.getpid(), os.getppid()))
else:
    print("我是父程序%d,我的子程序是%d" % (os.getpid(), pid))

# 我是父程序30007,我的子程序是30009
# 我是子程序30009,我的父程序是30007

方式一:

標準庫模組multiprocessing提供了一個類物件Process,用於表示程序

  1. 根據類物件Process建立程序例項物件

  2. 呼叫程序例項物件的方法start()啟動程序,呼叫start()後,會自動呼叫run(),方法run()會自動呼叫target指定的函式

from multiprocessing import Process, current_process

print("父程序啟動(%d----%s)" % (current_process().pid, current_process().name))

def do_something(*args):
    print("子程序啟動(%d----%s)" % (current_process().pid, current_process().name))
    print(args)
    print("子程序結束(%d----%s)" % (current_process().pid, current_process().name))

process = Process(target=do_something, args=(5, 10))
process.start()

import time

time.sleep(2)
print("父程序結束(%d----%s)" % (current_process().pid, current_process().name))

Process的__init__方法:

def __init__(self, group=None, target=None, name=None, args=(), kwargs={},*, daemon=None):
  • group:用於指定程序例項物件所屬的程序組,預設不屬於任何程序組
  • target:用於制定被run()呼叫的函式,預設沒有函式被呼叫
  • name:用於制定建立程序例項物件的名稱,第n個子程序的預設名稱為Process-n
  • args:用於制定target接受的未知引數,用元組表示,預設不接受位置引數
  • kwargs:用於制定target接受的關鍵值引數,用字典表示,預設不接受關鍵字引數
  • daemon:守護程序

方式二:

  1. 自定義繼承自Process的類物件,重寫特殊方法__init__()run()
  2. 根據自定義的類物件建立程序例項物件
  3. 呼叫程序例項物件的方法start()啟動程序,呼叫start()後,會自動呼叫重寫的run()
from multiprocessing import Process, current_process

print("父程序啟動(%d----%s)" % (current_process().pid, current_process().name))

class MyProcess(Process):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子程序啟動(%d----%s)" % (current_process().pid, current_process().name))
        print(self.args)
        print("子程序結束(%d----%s)" % (current_process().pid, current_process().name))

m = MyProcess(name="MyProcess", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父程序結束(%d----%s)" % (current_process().pid, current_process().name))

2.2 多程序執行的不確定性

from multiprocessing import Process, current_process

import time

def do_something():
    for i in range(4):
        print("%s:%d" % (current_process().name, i))
        time.sleep(2)

for i in range(3):
    Process(target=do_something).start()

do_something()

執行結果:

MainProcess:0
Process-1:0
Process-2:0
Process-3:0
MainProcess:1
Process-1:1
Process-2:1
Process-3:1
Process-1:2
MainProcess:2
Process-2:2
Process-3:2
Process-1:3
MainProcess:3
Process-2:3
Process-3:3

本次測試程式啟動了三個子程序去執行do_something方法,之後主程序也執行該方法,通過執行結果可以看出,每個程序之間執行是沒有順序。預設情況下,多個程序的執行順序和時間都是不確定的,完全取決於作業系統的排程

2.3 守護程序

import time
from multiprocessing import Process, current_process

print("父程序%d啟動" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子程序%d啟動" % current_process().pid)
        time.sleep(2)
        print("子程序%d結束" % current_process().pid)

m = MyProcess()
# m = MyProcess(daemon=True)
m.daemon = True
m.start()

print("父程序%d結束" % current_process().pid)

>>>父程序13717啟動
>>>子程序13721啟動
>>>父程序13717結束

可以在呼叫程序例項物件方法的start()之前,將屬性daemon的值設定為True,或直接通過建立物件傳入關鍵字引數,從而將程序設定為守護程序。守護程序是為了守護父程序而存在的子程序,當父程序結束時,守護程序就沒有了存在的意義,因此,守護程序會隨著父程序的結束而結束

2.4 阻塞父程序的子程序方法join

比如:現在有個需求,需要在子程序的程式執行完成以後接著主程序繼續執行,但是我們不知道子程序需要執行多長時間,此時就需要使用join方法。他能保證無論子程序執行多久,只有在子程序執行完畢以後,才會執行join方法之後的程式,此時方法join就處於阻塞狀態。

import time
from multiprocessing import Process, current_process

print("父程序%d啟動" % current_process().pid)

class MyProcess(Process):
    def run(self):
        print("子程序%d啟動" % current_process().pid)
        time.sleep(2)
        print("子程序%d結束" % current_process().pid)

m = MyProcess()
m.start()
# m.join()
m.join(1)
print("父程序%d結束" % current_process().pid)

>>>父程序13921啟動
>>>子程序13926啟動
>>>父程序13921結束
>>>子程序13926結束

在父程序中建立並啟動子程序後,可以呼叫子程序的方法join(),這樣子程序會把父程序阻塞,父程序會等子程序執行完之後再從被阻塞的地方繼續執行

在呼叫join()時,可以制指定引數timeout,從而指定子程序阻塞父程序的時間

2.5 全域性變數多個程序不共享

from multiprocessing import Process

NUM = 18

def do_something():
    global NUM
    NUM += 1

p = Process(target=do_something)
p.start()
p.join()

# 在子程序中修改全域性變數,對父程序中的全域性變數沒有影響
# 因為:子程序對父程序中的全域性變數做了一份拷貝,子程序與父程序中的全域性變數NUM是完全不同的兩個變數
print(NUM)  # 18

每個程序都有獨立的記憶體空間,從而程序間是相互獨立的,因此,全域性變數在多個程序之間不能共享。

2.6 多程序操作共享資料是不安全的

from multiprocessing import Process, Value

NUM = Value('i', 0)

def do_something():
    global NUM
    for i in range(10000):
        # 相當於:num = num +1
        # 首先計算num+1,存入臨時變數中,然後將臨時變數的值賦給NUM
        NUM.value += 1

p1 = Process(target=do_something)
p2 = Process(target=do_something)

p1.start()
p2.start()
p1.join()
p2.join()

print(NUM.value)  # 小於20000

由於多程序的執行是不確定的,導致多程序操作共享資料的結果是不可預期的,這也常被稱為不安全的

3. 執行緒

在Python中,想要實現多工除了使用程序,還可以使用執行緒來完成,執行緒是實現多工的另外一種方式。執行緒是程序中執行程式碼的一個分支,每個執行分支(執行緒)要想工作執行程式碼需要cpu進行排程 ,也就是說執行緒是cpu排程的基本單位,每個程序至少都有一個執行緒,而這個執行緒就是我們通常說的主執行緒。

3.1 多執行緒使用

任何程序都會自動建立並啟動一個執行緒,該執行緒被稱為父(主)執行緒,預設名稱是MainThread通過父程序建立的子程序也會自動建立一個執行緒

方式一:

標準庫模組threading提供了一個類物件Thread,用於表示執行緒

  1. 根據類物件Thread建立執行緒例項物件

  2. 呼叫執行緒例項物件的方法start()啟動執行緒,呼叫start()後,會自動呼叫run(),方法run()會自動呼叫target指定的函式

from threading import Thread, currentThread

print("父執行緒啟動(%d----%s)" % (currentThread().ident, currentThread().name))


def do_something(*args):
    print("子執行緒啟動(%d----%s)" % (currentThread().ident, currentThread().name))
    print(args)
    print("子執行緒結束(%d----%s)" % (currentThread().ident, currentThread().name))


thread = Thread(target=do_something, args=(5, 10), name="MyThread")
thread.start()

import time

time.sleep(2)
print("父執行緒結束(%d----%s)" % (currentThread().ident, currentThread().name))

# 執行結果:
>>>父執行緒啟動(140450223359808----MainThread)
>>>子執行緒啟動(140450214668032----MyThread)
>>>(5, 10)
>>>子執行緒結束(140450214668032----MyThread)
>>>父執行緒結束(140450223359808----MainThread)

Thread的__init__方法:

def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):
  • group:用於指定執行緒例項物件所屬的執行緒組,預設不屬於任何執行緒組
  • target:用於制定被run()呼叫的函式,預設沒有函式被呼叫
  • name:用於制定建立執行緒例項物件的名稱,第n個子執行緒的預設名稱為Thread-n
  • args:用於制定target接受的未知引數,用元組表示,預設不接受位置引數
  • kwargs:用於制定target接受的關鍵值引數,用字典表示,預設不接受關鍵字引數
  • daemon:用於制定執行緒例項物件是否是守護執行緒,預設不是守護執行緒

方式二:

  1. 自定義繼承自Thread的類物件,重寫特殊方法__init__()run()
  2. 根據自定義的類物件建立執行緒例項物件
  3. 呼叫執行緒例項物件的方法start()啟動執行緒,呼叫start()後,會自動呼叫重寫的run()
from threading import Thread, current_thread

print("父執行緒啟動(%d----%s)" % (current_thread().ident, current_thread().name))

class MyThread(Thread):
    def __init__(self, name, args):
        super().__init__(name=name)
        self.args = args

    def run(self):
        print("子執行緒啟動(%d----%s)" % (current_thread().ident, current_thread().name))
        print(self.args)
        print("子執行緒結束(%d----%s)" % (current_thread().ident, current_thread().name))

m = MyThread(name="MyThread", args=(5, 10))
m.start()

import time

time.sleep(2)
print("父執行緒結束(%d----%s)" % (current_thread().ident, current_thread().name))


# 執行結果:
>>>父執行緒啟動(140450223359808----MainThread)
>>>子執行緒啟動(140450214668032----MyThread)
>>>(5, 10)
>>>子執行緒結束(140450214668032----MyThread)
>>>父執行緒結束(140450223359808----MainThread)

3.2 多執行緒執行的不確定性

同多程序。預設情況下,多個執行緒的執行順序和時間都是不確定的,完全取決於作業系統的排程

from threading import Thread, current_thread

import time

def do_something():
    for i in range(5):
        print("%s:%d" % (current_thread().name, i))
        time.sleep(2)

for i in range(3):
    Thread(target=do_something).start()

do_something()

執行結果:

Thread-1:0
Thread-2:0
Thread-3:0
MainThread:0
Thread-2:1
Thread-3:1
MainThread:1
Thread-1:1
Thread-2:2
Thread-3:2
MainThread:2
Thread-1:2
Thread-2:3
Thread-3:3
Thread-1:3
MainThread:3

3.3 守護執行緒

在建立執行緒例項物件時可以將引數daemon的值設定為True,從而將執行緒設定為守護執行緒,守護執行緒是為了守護父執行緒而存在的子執行緒,當父執行緒結束時,守護執行緒就沒有了存在的意義,因此,守護執行緒會隨著父執行緒的結束而結束

import time
from threading import Thread, current_thread

print("父執行緒%d啟動" % current_thread().ident)

class MyThread(Thread):

    def run(self):
        print("子執行緒%d啟動" % current_thread().ident)
        time.sleep(2)
        print("子執行緒%d結束" % current_thread().ident)

# m = MyThread(daemon=True)
m = MyThread()
m.setDaemon(True)
# m.daemon = True
m.start()

time.sleep(1)

print("父執行緒%d結束" % current_thread().ident)

3.4 阻塞父執行緒的子執行緒方法join

在父執行緒中建立並啟動子執行緒後,可以呼叫子執行緒的方法join(),這樣子執行緒會把父執行緒阻塞,父執行緒會等子執行緒執行完之後再從被阻塞的地方繼續執行在呼叫join()時,可以制指定引數timeout,從而指定子執行緒阻塞父執行緒的時間

import time
from threading import Thread, current_thread

print("父執行緒%d啟動" % current_thread().ident)

class MyThread(Thread):
    def run(self):
        print("子執行緒%d啟動" % current_thread().ident)
        time.sleep(2)
        print("子執行緒%d結束" % current_thread().ident)

m = MyThread()
m.start()
# m.join()
m.join(1)
print("父執行緒%d結束" % current_thread().ident)

3.5 全域性變數在程序的所有執行緒中可以共享

from threading import Thread

NUM = 18

def do_something():
    global NUM
    NUM += 1

t = Thread(target=do_something)
t.start()
t.join()

print(NUM)  # 19

程序內的所有執行緒共享記憶體空間,所以全域性變數在程序的所有執行緒中可以共享

3.6 多執行緒操作共享資料是不安全的

from threading import Thread
import dis

NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        # 相當於:num = num +1
        # 首先計算num+1,存入臨時變數中,然後將臨時變數的值賦給NUM
        NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 小於2000000

由於多執行緒的執行是不確定的,導致多執行緒操作共享資料的結果是不可預期的,這也常被稱為不安全的

4. 程序池

4.1 程序池Pool

如果併發的任務數過多,一次性建立並啟動大量的程序會給計算機帶來很大的壓力,那麼就可以使用程序池對建立與啟動的程序進行限制和管理程序池中能所容納的程序數目是固定的。

標準庫模組multiprocessing中提供一個類物件Pool,用於表示程序池,程序池中所能容納的程序數目可以在建立Pool例項物件時程序指定,如果不指定,預設大小是CPU的核數

from multiprocessing import Pool
import time, random

print("父程序啟動")

def do_something(i):
    print("子程序%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子程序%d結束,耗時%.2f秒" % (i, end - start))

p = Pool(3) # 將程序池所能容納的最大程序數指定為3

for i in range(1, 11):
    # 與start()類似,不同的是,建立並啟動有程序池管理的子程序
    p.apply_async(do_something, args=(i,))

# 呼叫方法join()之前,必須呼叫close()
# 呼叫close()之後就不能讓程序池在管理新的程序了
p.close()

# 父程序被阻塞
# 程序池管理的所有子程序執行完之後,父程序再從被阻塞的地方繼續執行
p.join()

print("父程序結束")

# 程式執行後同時建立並執行3個子程序,第四個子程序要等前面三個中的某一個執行完畢之後,才會建立並啟動

4.2 ProcessPoolExecutor

在標準庫模組concurrent.futures中提供了一個類物件ProcessPoolExecutor,也用於表示程序池。與Pool相比,ProcessPoolExecutor的功能和效能更加強大。

from concurrent.futures import ProcessPoolExecutor
import time, random

print("父程序啟動")

def do_something(i):
    print("子程序%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子程序%d結束,耗時%.2f秒" % (i, end - start))

# 將程序池所能容納的最大程序數指定為3
p = ProcessPoolExecutor(max_workers=3)

# 將需要程序池處理的任務全部交給程序池,此後會建立並啟動由程序池管理的子程序
for i in range(1, 11):
    p.submit(do_something, i)

# 父程序被阻塞
# 程序池管理的所有子程序執行完之後,父程序再從被阻塞的地方繼續執行
p.shutdown(wait=True)

print("父程序結束")

ProcessPoolExecutor的父類物件Executor遵守了上下文管理解析,所以可以使用with語句,這樣在離開執行時上下文是會自動呼叫shutdown(wait=True)方法

原始碼 concurrent.futures._base.py 下面的類物件Executor

def __enter__(self):
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False
with ProcessPoolExecutor(max_workers=3) as p:
    """
    for i in range(1, 11):
        p.submit(do_something, i)
    """
    # 等價於:
    p.map(do_something, range(1, 11))

4.3 ProcessPoolExecutor物件方法

  1. submit(self, fn, *args, **kwargs):

返回值是一個Future例項物件,表示子程序所呼叫的那個函式的執行,比如:do_something(),可以呼叫Futureresult()得到這個函式的返回值。方法result()是一個同步方法,直到這個函式執行完畢之後方法result()才會返回

with ProcessPoolExecutor(max_workers=5) as p:
     for i in range(1, 11):
        future = p.submit(do_something, i)
        # 同步,需要等待do_something執行完畢
        print(future.result())

p.submit(do_something, i)是立刻返回,可以通過列表儲存返回的物件,之後遍歷獲取返回值

with ProcessPoolExecutor(max_workers=3) as p:
    obj_list = []
    for i in range(1, 11):
        future = p.submit(do_something, i)
        # 非同步,無需等待do_something執行完畢
        print(future)
        obj_list.append(future)

for obj in obj_list:
    print(obj.result())
  1. wait(fs, timeout=None, return_when=ALL_COMPLETED):

該函式用於阻塞父程序,以等待指定的Future例項物件序列,直到滿足條件

  • fs : 用於指定要等待的Future物件例項序列

  • timeout : 用於指定等待的最長時間,如果指定為None或不指定,則一直等待

  • return_when : 用於指定這個函式何時反覆,有三種取值:

    • FIRST_COMPLETED :當第一個Future例項物件已經完成或取消時

    • FIRST_EXCEPTION :當第一個Future例項物件丟擲異常時

    • ALL_COMPLETED :當所有Future例項物件已經完成或已被取消時

該函式的返回值是有兩個集合組成的元組,第一個集合包含了已經完成或已被取消的所有Future例項物件,第二個集合包含了沒有完成並且沒有被取消的Future例項物件

from concurrent.futures import ProcessPoolExecutor, wait, as_completed, ALL_COMPLETED, FIRST_COMPLETED

import time, random

def do_something(i):
    time.sleep(random.random() * 10)
    return i * i

p = ProcessPoolExecutor(max_workers=3)
objs = []

for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)
    
(done, not_done) = wait(objs, return_when=ALL_COMPLETED)
print(done)  # 已經完成的Future物件例項的集合
print(not_done)  # 未完成的Future物件例項的集合
  1. as_completed(fs, timeout=None):

該函式用於將指定的Future例項物件序列轉換為一個迭代器,當序列中的任意一個Future例項物件已經完成或已被取消時都會被yield。這樣,通過遍歷得到的迭代器,就可以在任意一個Future例項物件已經完成或已被取消時立即做一些處理,比如:呼叫result()方法得到執行結果

  • fs : 用於指定Future例項物件的序列
  • timeout : 用於指定超時時間,如果指定為None或不指定,則不會超時
for i in range(1, 4):
    future = p.submit(do_something, i)
    objs.append(future)

future_iterator = as_completed(objs)
for future in future_iterator:
    print(future.done())
    print(future.cancel())
    print(future.result())

# executor.done() # 檢視任務是否完成(True/False)
# executor.cancel() # 取消等待的任務,在執行中或執行完成的任務無法取消
# executor.result() # 獲取任務函式返回的結果

5. 執行緒池

5.1 執行緒池ThreadPool

第三方庫threadpool中提供一個類物件ThreadPool,用於表示執行緒池,執行緒池中所能容納的執行緒數目可以在建立ThreadPool例項物件時執行緒指定,如果不指定,預設大小是CPU的核數

from threadpool import ThreadPool, makeRequests
import time, random

print("父執行緒啟動")

def do_something(i):
    print("子執行緒%d啟動" % i)
    start = time.time()

    time.sleep(random.random() * 10)
    end = time.time()
    print("子執行緒%d結束,耗時%.2f秒" % (i, end - start))

t = ThreadPool(3)

# 建立需要執行緒池處理的任務
requests = makeRequests(do_something, args_list=range(1, 11))

# 將需要執行緒池處理的任務全部交給執行緒池,此後會建立並啟動執行緒由執行緒池管理的子執行緒
for req in requests:
    t.putRequest(req)

# 父執行緒被阻塞
# 執行緒池管理的所有子執行緒執行完之後,互相陳再從阻塞的地方繼續執行
t.wait()

print("父執行緒結束")

# 程式執行後同時建立並執行3個子執行緒,第四個子執行緒要等前面三個中的某一個執行完畢之後,才會建立並啟動

5.2 ThreadPoolExecutor

與程序池ProcessPoolExecutor用法一致

6. 同步(適用於程序和執行緒)

為了保證多個執行緒(程序)能安全的操作共享資料,必須確保一個執行緒(程序)在操作共享資料時,其他執行緒(程序)都不能操作。

一個執行緒(程序)A在操作共享資料前必須先試圖獲得鎖,從而給相關程式碼上鎖,執行緒A獲得鎖之後,鎖的狀態變成為locked。如果另外一個執行緒(程序)B試圖獲得鎖,執行緒(程序)B的狀態會變成為blocked並且被新增到鎖等待池,只能等待獲得鎖的執行緒(程序)A在釋放鎖之後,鎖的狀態變為unlocked,執行緒(程序)排程程式再從鎖等待池中處於狀態blocked的執行緒(程序)中選擇一個來獲得鎖,獲的鎖之後該執行緒(程序)的狀態變為running。由於只有一把鎖,無論多少個執行緒(程序),同一時刻最多隻有一個執行緒(程序)能獲得該鎖,這樣就確保了操作共享資料的相關程式碼只能有一個執行緒(程序)從頭到尾完成的執行,從而確保了多個執行緒(程序)操作共享資料總是安全的。但是包含鎖的相關程式碼只能以單執行緒模式執行,因此效率大大降低了。

6.1 互斥鎖Lock

  • 多執行緒同步之Lock

標準庫模組threading中提供了一個類物件Lock,用於表示鎖,以實現多執行緒之間的同步簡單的說:同步就意味著”阻塞和等待“。為了保證獲得鎖的執行緒(程序)用完了後一定要釋放鎖,可以將操作共享資料的程式碼放在try語句塊中,把釋放鎖的程式碼放在finally語句塊。由於類物件Lock遵循了上下文管理協議,所以可以使用with語句進行簡化,這樣,在進入執行時上下文是會自動呼叫方法require(),在離開執行時上下文是會自動呼叫release()

  • acquire(blocking=True, timeout=-1):

    • 當浮點型 timeout 引數被設定為正值呼叫時,只要無法獲得鎖,將最多阻塞 timeout 設定的秒數。
    • 當呼叫時引數 blocking 設定為 True (預設值),阻塞直到鎖被釋放,然後將鎖鎖定並返回 True 。如果blocking 被設定為 False 的情況下呼叫,將不會發生阻塞。如果呼叫時已經鎖定,則立即返回 False ,表示獲取鎖失敗;否則,將鎖鎖定並返回 True。
  • release():

    • 釋放一個鎖。這個方法可以在任何執行緒中呼叫,不單指獲得鎖的執行緒。當鎖被鎖定,將它重置為未鎖定,並返回。如果其他執行緒正在等待這個鎖解鎖而被阻塞,只允許其中一個允許。
from threading import Thread, Lock

lock = Lock() # 定義鎖,預設狀態是解鎖
NUM = 0

def do_something():
    global NUM
    for i in range(1000000):
        """
        lock.acquire() # 上鎖
        try:
            NUM += 1
        finally:
            lock.release() # 釋放鎖(解鎖)
        """
        with lock:
            NUM += 1

t1 = Thread(target=do_something)
t2 = Thread(target=do_something)

t1.start()
t2.start()
t1.join()
t2.join()

print(NUM)  # 2000000
  • 多程序同步之Lock

標準庫模組multiprocessing中提供了一個類物件Lock,用於表示鎖,以實現多程序之間的同步.

from multiprocessing import Process, Lock, Value

lock = Lock()
NUM = Value('i', 0)

def do_something():
    global NUM
    for i in range(1000000):
        """
        lock.acquire()
        try:
            NUM += 1
        finally:
            lock.release()
        """
        with lock:
            NUM.value += 1

p1 = Process(target=do_something)
p2 = Process(target=do_something)

p1.start()
p2.start()
p1.join()
p2.join()

print(NUM.value)  # 2000000

6.2 死鎖

6.2.1 執行緒死鎖

如果有多個共享資料,在多個執行緒操作這多個共享資料時,如果兩個執行緒分別通過加鎖佔有一部分共享資料,並且同時等待對方釋放鎖,這樣就會導致兩個執行緒永遠相互等待而產生死鎖。要避免程式中出現死鎖的情況,在避免死鎖的演算法中最有代表性的演算法是Dijkstra提出的銀行家演算法

from threading import Thread, Lock, current_thread

numa = 0
numb = 0
locka = Lock()
lockb = Lock()

def do_something():
    fun1()
    fun2()

def fun1():
    global numa, numb
    locka.acquire()

    try:
        print("%s------func1()-------locka" % current_thread().ident)
        numa += 1
        lockb.acquire()
        try:
            print("%s------func1()-------lockb" % current_thread().ident)
            numb += 1
        finally:
            lockb.release()
    finally:
        locka.release()

def fun2():
    global numa, numb
    lockb.acquire()
    try:
        print("%s------func2()-------lockb" % current_thread().ident)
        numb += 1
        locka.acquire()
        try:
            print("%s------func2()-------locka" % current_thread().ident)
            numb += 1
        finally:
            locka.release()
    finally:
        lockb.release()

t_list = []
for i in range(100):
    t = Thread(target=do_something)
    t_list.append(t)
    t.start()

for item in t_list:
    item.join()

print("父執行緒結束")

執行緒一執行func1(),優先獲得locka的鎖,此時執行緒二執行func2(),優先獲得lockb鎖,之後方法func1()需要獲取lockb鎖,方法func2()需要獲取locka鎖,也就是各個執行緒都需要另外一個執行緒的資源,但是在獲取此前他們各自都沒有解鎖,也就導致他們都沒法上鎖,始終保持僵持的狀態。這一種狀態就是死鎖。

6.2.2 程序死鎖

程序通過共享記憶體訪問全域性變數,同樣可以利用上述測試程式碼

from multiprocessing import Process, Lock, current_process,Value

numa = Value('i',0)

numb = Value('i',0)

locka = Lock()
lockb = Lock()

......

6.3 RLock

在同一執行緒中,當呼叫了Lock的方法acquire()之後,如果在呼叫方法release()之前再次呼叫了方法acquire(),也會導致死鎖

from threading import Thread, current_thread, RLock, Lock

lock = Lock()
lock.acquire()
print("獲得鎖")

lock.acquire()
print("獲得鎖")

lock.release()
print("釋放鎖")

lock.release()
print("釋放鎖")

>>>獲得鎖
>>>

標準庫模組threading中還提供了一個用於表示鎖的類物件RLock(可重入鎖)。與Lock相同的是:RLock也提供了獲得鎖的方法acquire(),和釋放鎖的方法release()Lock不同的是:在同一個執行緒中,當呼叫了RLock的方法acquire()之後。可以再呼叫方法release()之前可以多次呼叫方法acquire()而不會導致死鎖。

lock = RLock()

lock.acquire()
print("獲得鎖")
lock.acquire()
print("獲得鎖")
lock.release()
print("釋放鎖")
lock.release()
print("釋放鎖")

>>>獲得鎖
>>>獲得鎖
>>>釋放鎖
>>>釋放鎖

原始碼分析:

在RLock的內部維護了一個Lock和計數器count。count記錄了鎖被acquire的次數。當執行緒第一次呼叫方法acquire()獲的鎖是,鎖的擁有者被儲存,同時計數器count被初始化為1;但再次呼叫方法acquire()時首先會判斷呼叫這是否是鎖的擁有者。如果是,計數器count加一。RLock相當於一個門可以上多把鎖,上多少鎖就的開多少把鎖,因此方法acquire()和release()必須成對出現,如果在某個執行緒中,呼叫了n次acquire(),必須呼叫n次release()才能釋放該執行緒所佔用的鎖

class _RLock:

    def __init__(self):
        self._block = _allocate_lock()
        self._owner = None
        self._count = 0

    def acquire(self, blocking=True, timeout=-1):
        me = get_ident() # 獲取執行緒id
        if self._owner == me: # 判斷當前執行緒時候是自己,如果是第一次上鎖,__owner預設是None,則不執行
            self._count += 1 # 如果執行緒已經上過一次鎖,則計數器加一
            return 1
        rc = self._block.acquire(blocking, timeout) # 呼叫底層Lock鎖的acquire方法,上鎖成功返回True,否則阻塞
        if rc:
            self._owner = me # __owner記錄為當前執行緒id
            self._count = 1 # 計數器設定為1
        return rc # 返回布林值 

    __enter__ = acquire

    def release(self):
        if self._owner != get_ident(): # 判斷呼叫釋放鎖的執行緒id是否為自己,意思就是解鈴還須繫鈴人
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1 # 計數器減一
        if not count: # 計數器為0時表示解鎖完成
            self._owner = None # 取消當前執行緒與鎖的繫結
            self._block.release() # 呼叫底層Lock鎖的release方法進行解鎖

    def __exit__(self, t, v, tb):
        self.release()

6.4 Condition

標準庫模組threading中提供了一個類物件Condition,用於表示帶觸發條件的鎖,以幫助我們處理多執行緒間複雜的同步問題,Condition允許一個或多個執行緒等待觸發條件,直到收到另一個執行緒的通知

from threading import Thread, Condition
import time

cond = Condition()


class MyThread1(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        cond.acquire()
        print("%s:快來gank" % self.name)

        cond.wait()

        time.sleep(2)
        print("%s:怎麼還不來啊,都快越塔了" % self.name)
        cond.notify()

        cond.wait()
        # 思考兩秒再說
        time.sleep(2)
        print("%s:first blood" % self.name)
        cond.notify()

        cond.release()


class MyThread2(Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        cond.acquire()
        # 思考2秒
        time.sleep(2)
        print('%s:打完f6就來' % self.name)
        cond.notify()

        cond.wait()
        time.sleep(2)
        print('%s:馬上,你在猥瑣一下' % self.name)
        cond.notify()

        cond.wait()
        time.sleep(2)
        print('%s:我去,你好坑啊' % self.name)

        cond.release()


MyThread1("中單").start()
MyThread2("打野").start()


>>>中單:快來gank
>>>打野:打完f6就來
>>>中單:怎麼還不來啊,都快越塔了
>>>打野:馬上,你在猥瑣一下
>>>中單:first blood
>>>打野:我去,你好坑啊

分析:
分別定義兩個執行緒,為Thread1和Thread2,執行緒啟動之後兩個執行緒爭先上鎖(此時的上鎖是必須的,預設的鎖是RLock鎖)。假如執行緒1搶到鎖,之後可以執行一部分邏輯(列印,中單:快來gank),而執行緒2則處於阻塞狀態,當執行緒1呼叫wait時,底層會釋放一級鎖(RLock),並且儲存執行緒的狀態,然後建立一個二級鎖(Lock鎖),並且執行acquire上鎖,還把該鎖存入到了一個雙端佇列,之後再次呼叫acquire阻塞自身。這個時候執行緒1釋放了RLock,則執行緒2就可以上鎖,同樣也執行一部分邏輯,之後需要呼叫notify發起通知(不能呼叫wait,會引起死鎖),預設只發起一個通知,從佇列中拿出隊頭的二級鎖,進行解鎖,之後從佇列中移除該二級鎖。解鎖之後,執行緒1的二級鎖就可以上鎖,之後便恢復執行緒的狀態,執行緒2處理完邏輯(打野:打完f6就來),呼叫wait阻塞等待,此時的執行緒1恢復了,就從wait被喚醒,執行邏輯(中單:怎麼還不來啊,都快越塔了),想要執行緒2執行就必須呼叫notify通知。

原始碼分析:

class Condition:
    # 條件變數允許一個或多個執行緒進入到等待狀態,直到它們被其他執行緒喚醒

    def __init__(self, lock=None):
        # 如果傳入的lock為空則預設初始化一個RLock例項
        if lock is None:
            lock = RLock()
        self._lock = lock

        # 將Condition的acquire()和release()方法設定為RLock的acquire()和release()方法
        self.acquire = lock.acquire 
        self.release = lock.release 
        
        # 三個方法的代理;因為Lock沒有實現這三個方法,就會預設使用Condition的方法
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass

        # 用雙端佇列記錄一個waiting池
        self._waiters = _deque() 

    def __enter__(self):
        return self._lock.__enter__() # 呼叫可重入鎖的__enter__()方法,本質是呼叫Lock的acquire方法

    def __exit__(self, *args):
        return self._lock.__exit__(*args) # 呼叫可重入鎖的__exit__()方法,本質是呼叫Lock的release方法
    
    """
    對於RLock而言,呼叫release()方法,並不一定會真正的釋放鎖。
    因此,它提供了_release_save()方法,該方法會真正的釋放鎖,
    並且將RLock內部維護的狀態,返回給呼叫方。
    之後,執行緒再次獲取到底層鎖之後,再將狀態重置回去
    RLock內部會維護兩個狀態:owner-擁有鎖的執行緒的id,count-該執行緒獲取了多少次鎖
    """
    # 只有底層鎖沒有_release_save()和_acquire_restore()方法時,才用下面的實現
    def _release_save(self):
        self._lock.release()           # No state to save

    def _acquire_restore(self, x):
        self._lock.acquire()           # Ignore saved state

    def _is_owned(self):
        """
        如果當前執行緒持有鎖,則返回True,否則返回False,其過程是:
      - 使用非阻塞的方式獲取鎖,如果獲取成功,則表明當前執行緒,沒有鎖,那麼釋放剛剛獲取到的鎖,並返回False;
          - 否則,認為當前執行緒持有鎖,返回True。
        """
        
        # 只有當底層鎖沒有_is_owned()方法時,才會用這種方式判斷當前執行緒是否擁有底層鎖
        # 因為RLock具有_is_owned()方法,所以,它的物件不會使用這裡的_is_owned()方法
        if self._lock.acquire(0): 
            self._lock.release()
            return False # 
        else:
            return True

    def wait(self, timeout=None):
        # 1,如果當前執行緒,沒有上鎖,那麼丟擲異常
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")

        # 2,設定一個二級鎖(Lock),並且為它上鎖,然後放入佇列。注意這裡的上鎖操作,利用Lock的特性,下面就會上演如何輕鬆實現主動阻塞。
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)

        # 3,釋放底層鎖,並儲存鎖物件的內部狀態
        # 釋放一級鎖。這裡可以看下Rlock的實現,因為Rlock的計數器可能大於1的,這裡提供了私人的方法一步到位的將 self._owner 、 self._count 提取出來。
        saved_state = self._release_save()

     
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                # 3.1,如果timeout是None,那麼再次以阻塞的方式獲取二級鎖(Lock)物件
                #  - 因此當前執行緒已經獲取了一次該鎖,因此當前執行緒會阻塞,直到其他執行緒釋放該鎖
                # 再次對二級鎖進行上鎖操作,很輕巧的實現自我阻塞,等待其他執行緒呼叫 notify() 進行喚醒。
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    # 3.2,如果設定了timeout為正數,阻塞等待設定的時間獲取鎖,獲取到返回True,否則返回False
                    gotit = waiter.acquire(True, timeout)
                else:
                    # 3.3,如果設定了timeout為負數,則以非阻塞的方式獲取,沒有獲取到直接返回False
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)  # 被喚醒後,重新獲得對一級鎖的擁有。將saved_state資料進行恢復。
            if not gotit:
                try:
                    self._waiters.remove(waiter) 
                except ValueError:
                    pass

    def wait_for(self, predicate, timeout=None):
        endtime = None
        waittime = timeout
        result = predicate()
        while not result:
            if waittime is not None:
                if endtime is None:
                    endtime = _time() + waittime
                else:
                    waittime = endtime - _time()
                    if waittime <= 0:
                        break
            self.wait(waittime)
            result = predicate()
        return result

    def notify(self, n=1):
        # 1,如果當前執行緒,沒有上鎖,那麼丟擲異常
        if not self._is_owned(): # 判斷
            raise RuntimeError("cannot notify on un-acquired lock")
        
        # 備份當前執行緒佇列;定義新得佇列,將前n個執行緒轉移到waiters_to_notify
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))

        # 佇列為空直接返回
        if not waiters_to_notify:
            return
        
        # 佇列不為空,迴圈釋放二級鎖,wait函式中的wait.acquire()喚醒;最後嘗試將該鎖踢出佇列
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

    def notify_all(self):
        self.notify(len(self._waiters))

    notifyAll = notify_all

總結:
在Condition中有兩把鎖,預設一把RLock和一把Lock。結合方法理解兩把鎖的作用:

  • wait():呼叫首先判斷當前執行緒是否上鎖,此時的鎖是RLock鎖,接著通過_allocate_lock()獲取一把Lock鎖並且上鎖加入到雙端佇列中,之後呼叫_release_save把一級鎖(RLock鎖)釋放掉返回值得到當前鎖重入的次數(count)和執行緒標識(owner),然後接著再次對二級鎖(Lock)進行上鎖操作,由於Lock無法重入,也就意味著上一次如果上鎖成功,此時上鎖將處於阻塞狀態,直到當前執行緒執行notify被呼叫,滿足釋放該鎖,最後通過_acquire_restore嘗試獲取一級鎖(RLock),把恢復之前儲存的狀態saved_state
  • notify:呼叫首先判斷當前執行緒是否上鎖,此時的鎖是RLock鎖,備份當前執行緒二級鎖(Lock)佇列;定義新得佇列,將前n個二級鎖轉移到新佇列,如果佇列為空直接返回,佇列不為空,迴圈釋放二級鎖,wait函式中的wait.acquire()喚醒;最後嘗試將該鎖踢出佇列

6.5 生產者消費者(多執行緒Condition)

生產者消費者問題:假設和有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來交換產品。

  • 生產者策略:如果市場剩餘產品小於20個,那麼就生產4個產品放到市場
  • 消費者策略:如果市場上剩餘產品多於10個,那麼就從市場上消費3個產品
from threading import Thread, Condition
import time

cond = Condition()
count = 0

class Producer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count < 20:
                count += 4
                print("%s:生產者生產了4個,當前總共%d個" % (self.name, count))
                cond.notify()
            else:
                print("%s:不生產,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

class Consumer(Thread):
    def run(self):
        global count, cond
        while True:
            cond.acquire()
            if count > 10:
                count -= 3
                print("%s:消費者消費了3個,當前總共%d個" % (self.name, count))
                cond.notify()
            else:
                print("%s:不消費,等待" % self.name)
                cond.wait()
            cond.release()
            time.sleep(2)

for i in range(3):
    Producer().start()

for i in range(3):
    Consumer().start()

6.6 Semaphore(訊號量)

標準庫模組threading中提供了類物件Semaphore,用於表示訊號量,他可以幫助我們控制併發執行緒的最大數量從而實現多執行緒之間的同步

from threading import Semaphore, Thread
import time, random

sem = Semaphore(3)

class MyThread(Thread):
    def run(self):
        """
        sem.acquire()
        print("%s獲得資源" % self.name)
        time.sleep(random.random() * 10)
        sem.release()
        """
        with sem:
            print("%s獲得資源" % self.name)
            time.sleep(random.random() * 10)

for i in range(10):
    MyThread().start()

原始碼分析:

class Semaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock()) # 定義條件變數,傳入底層鎖
        self._value = value

    def acquire(self, blocking=True, timeout=None):
        
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None

        # 本質是呼叫Condition的__enter__方法,最終是呼叫的Lock的acquire方法,上鎖
        with self._cond:
            while self._value == 0: # 計數器
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc

    __enter__ = acquire

    def release(self):
        """Release a semaphore, incrementing the internal counter by one.

        When the counter is zero on entry and another thread is waiting for it
        to become larger than zero again, wake up that thread.

        """
        with self._cond:
            self._value += 1
            self._cond.notify()

    def __exit__(self, t, v, tb):
        self.release()

6.7 Event

類物件Event也可以實現執行緒間同步,Event例項物件管理著一個內部標誌,通過改變這個內部標誌的值可以讓一個執行緒給其他處於阻塞狀態的執行緒傳送一個事件訊號。從而喚醒這些執行緒讓他們轉換為執行好狀態

  1. set(self):將內部標誌設定為True
  2. is_set(self):將判斷內部標誌時候給設定為True
  3. clear(self):將內部標誌設定為False
  4. wait(self,timeout=None):但內部標誌為False時,呼叫該方法的執行緒會阻塞。直到另外一個執行緒呼叫了set()將內部標誌設定為True,被阻塞的執行緒才會轉為執行狀態。
from threading import Thread, Event, current_thread

import time

event = Event()
print(event.is_set())  # False

def do_something():
    print("%s開始等待" % current_thread().name)
    event.wait()
    print("%s結束等待" % current_thread().name)

for i in range(3):
    Thread(target=do_something).start()

time.sleep(2)
event.set()

7. 程序間通訊

7.1 共享記憶體

如果想要實現程序之間的通訊,共享記憶體是常見的實現方式之一,它允許多個程序直接訪問同一塊記憶體

共享記憶體中的物件的型別必須是ctypes的,ctypes是與C語言相容的資料型別。為了在共享記憶體中建立ctypes型別的物件,標準庫模組multiprocessing提供了以下兩個類:

(1) Value(typecode_or_type,*args,**kwargs):

  • typecode_or_type : 用於指定數值的型別碼或ctypes型別

(2) Array(typecode_or_type,size_or_initializer,lock=True):    

  • typecode_or_type : 用於指定數值的型別碼或ctypes型別
  • size_or_initializer : 用於指定陣列的長度或Python中的序列
from multiprocessing import Value, Array, Process
import ctypes

# 在共享記憶體中建立一個表示數值的ctypes物件
num = Value('d', 2.3)
# num = Value(ctypes.c_float,2.3)

# 在共享記憶體中建立一個表示陣列的ctypes物件
arr = Array('i', range(1, 5))

# arr = Array(ctypes.c_int, range(1, 5))

def do_something():
    num.value = 1.8
    for i in range(len(arr)):
        arr[i] = -arr[i]

p = Process(target=do_something)
p.start()
p.join()

print(num.value)  # 1.8
print(arr[:])  # [-1, -2, -3, -4]

7.2 管道

函式Pipe(),其返回值是一個元組,元組中包含兩個物件,分別表示管道兩端的連線。

呼叫Pipe()時,如果不傳入引數或傳入的引數為True,管道的工作物件為雙向如果傳入的引數為False,管道的工作方式為單向,其中對於返回的元組,第一個連線物件只能接受資料,第二個連線物件只能傳送資料對於管道兩端的連線物件,主要有兩個方法:

  1. send(self,obj):用於將引數obj指定的物件傳送到管道
  2. recv(self):用於從管道接收物件
from multiprocessing import Pipe

conn1, conn2 = Pipe()  # 分別表示兩端的管道連線,引數為False表示單向,為True表示為雙向(預設)

conn1.send('conn1傳送資料1')
conn1.send('conn1傳送資料2')

conn2.send('conn2傳送資料1') 
conn2.send('conn2傳送資料2')

print(conn1.recv()) # conn2傳送資料1
print(conn1.recv()) # conn2傳送資料2
print(conn2.recv()) # conn1傳送資料1
print(conn2.recv()) # conn1傳送資料2

c1, c2 = Pipe(False)

# c1.send("c1")  # OSError: connection is read-only c1只能接收,不能傳送
# print(c2.recv())

c2.send("c2")
print(c1.recv()) # c2

7.3 Manager

如果想要實現程序之間的通訊,Manager也是常見的實現方式之一與共享記憶體相比,Manager更加靈活,因為它可以支援多種物件型別,此外,Manager還可以通過網路被不同計算機上的程序所共享。但是Manager的速度要比共享記憶體慢

from multiprocessing import Manager, Process

def func():
    d[1] = 18
    d['2'] = 56
    l.reverse()

manager = Manager()

# 通過manager建立了一個同於程序通訊的字典
d = manager.dict()

# 通過manager建立了一個同於程序通訊的列表
l = manager.list(range(5))
p = Process(target=func) # 子程序執行

p.start()
p.join()

# 主程序檢視資料時候被修改
print(d) # {1: 18, '2': 56}
print(l) # [4, 3, 2, 1, 0]

8. 定時器執行緒

如果想要在指定的時間片段之後再啟動子執行緒,可以使用標準庫模組threading提供的類物件Timer,用於表示定時器執行緒,Timer是Thread的子類,也可以通過方法start()來啟動執行緒。定時器只執行一次。如果需要每個一段時間執行一次,則需要在子執行緒呼叫的函式內部再次建立啟動子執行緒

from threading import Timer

def do_something():
    print("do something")
    
timer = Timer(2, do_something)
timer.start()

timer.cancel()  # 取消定時器

9. ThreadLocal