1. 程式人生 > >Python入門基礎(10)——併發程式設計

Python入門基礎(10)——併發程式設計

一、概念介紹

1、執行緒與程序的基礎概念

這裡就不詳細介紹了,直接被百度吧,一大堆

2、全域性直譯器鎖(GIL)

(1)GIL全稱全域性直譯器鎖Global Interpreter Lock,GIL並不是Python的特性,它是在實現Python解析器(CPython)時

所引入的一個概念。

(2)GIL是一把全域性排他鎖,同一時刻只有一個執行緒在執行。

  •  毫無疑問全域性鎖的存在會對多執行緒的效率有不小影響。甚至就幾乎等於Python是個單執行緒的程式。
  • multiprocessing庫的出現很大程度上是為了彌補thread庫因為GIL而低效的缺陷。它完整的複製了一套thread所提供的介面方便遷移。唯一的不同就是它使用了多程序而不是多執行緒。每個程序有自己的獨立的GIL,因此也不會出現程序之間的GIL爭搶。

二、多執行緒

1、單執行緒的開始與結束

import threading  #匯入執行緒模組,可以建立多個執行緒,並且可以在多執行緒間進行通訊與同步
import time  #匯入時間模組,一般併發程式設計都要用到


def thread_run(name):
    print("%s's first thread!" % name)  #%s用於輸出變數,和C差不多
    time.sleep(5)  #停頓5秒

if __name__ == '__main__':
    LiMing = threading.Thread(target=thread_run("李明"))
    Zhangsan = threading.Thread(target=thread_run,args=('張三',))
    LiMing.start()
    Zhangsan.start()

上面直接匯入了一個執行緒的例項,首先要解決如下幾個問題:

(1)匯入執行緒模組有兩種方法

#第一種方法
import threading  #匯入執行緒模組,可以建立多個執行緒,並且可以在多執行緒間進行通訊與同步

LiMing = threading.Thread(target=thread_run("李明"))  #建立執行緒
LiMing.start()

#第二種方法
from threading import Thread  #匯入執行緒模組

LiMing = Thread(target=thread_run("李明"))  #建立執行緒
LiMing.start()

(2)傳參問題

#共有兩種方法
from threading import Thread  #匯入執行緒模組

LiMing = Thread(target=thread_run("李明"))  #第一種方法,直接在函式裡面寫引數
ZhangSan = Thread(target=thread_run(),  args=("張三",))   #第二種方法,多寫一個引數
LiMing.start()

我看了很多執行緒類的例項,大都用第二種方法,第一種方法是在寫程式碼的時候,程式預設寫出來的

(3)start()函式

開啟執行緒,開啟之後,執行緒就會獨立執行它的目標函式(也就是建立執行緒時的target函式)

(4)join()函式

開啟之後,就會阻塞主執行緒的向下執行,直到呼叫join方法的執行緒執行結束以後,方才會繼續執行

#不加入join函式
import time
import threading

def thread_run(name):
    time.sleep(2)
    print("%s's first thread!!!"% name)


mike = threading.Thread(target=thread_run, args=('Mike', ))
jone = threading.Thread(target=thread_run, args=('jone', ))

mike.start()
jone.start()
print('main thread is running!!!')

執行結果:
main thread is running!!!
jone's first thread!!!
Mike's first thread!!!

上面執行結果,是因為開啟mike和jone執行緒之後,當他們執行的時候,會先停頓2秒時間,這兩秒中,主程式會繼續執行,所以先輸出:main thread is running!!!,然後另外兩個執行緒停頓完畢,就相繼執行(注意這裡是併發的)

import time
import threading

def thread_run(name):
    time.sleep(2)
    print("%s's first thread!!!"% name)


mike = threading.Thread(target=thread_run, args=('Mike', ))
jone = threading.Thread(target=thread_run, args=('jone', ))

mike.start()
jone.start()
mike.join()    #阻塞子執行緒mike直到mike執行緒執行完畢
jone.join()    #阻塞子執行緒jone直到jone執行緒執行完畢
print('main thread is running!!!')

輸出結果:
Mike's first thread!!!
jone's first thread!!!
main thread is running!!!

上面啟動mike.join後,就會阻塞其他執行緒(我是這麼理解的),直到mike執行緒的目標程式執行結束,方才會執行下面的程式碼

#注意join函式只會阻塞主執行緒,不會阻塞其他執行緒
import time
import threading

def thread_run1(name):
    time.sleep(5)
    print("%s's first thread!!!"% name)

def thread_run2(name):
    time.sleep(2)
    print("%s's first thread!!!"% name)

mike = threading.Thread(target=thread_run1, args=('Mike', ))
jone = threading.Thread(target=thread_run2, args=('jone', ))

mike.start()
jone.start()
mike.join()    #阻塞子執行緒mike直到mike執行緒執行完畢
jone.join()    #阻塞子執行緒jone直到jone執行緒執行完畢
print('main thread is running!!!')

輸出結果:
jone's first thread!!!   #可以看出jone是輸出的
Mike's first thread!!!
main thread is running!!!

我先說一下,上面這個程式碼的執行順序,首先是建立了主執行緒,然後主執行緒建立了mike執行緒,mike執行緒開啟就開始執行thread_run1,先停頓5秒,與此同時(應該還是併發的)jone執行緒建立,開始執行thread_run2,然後要停頓2秒,之後mike呼叫了join函式,這時候主函式就停在了mike.join()這行程式碼中,不再往下執行(也就是不再執行jone.join()程式碼,知道上面mike執行緒執行結束),也就是說阻塞了主執行緒,只有當mike執行緒執行結束,主執行緒才會繼續;

但是這個時候,jone執行緒並沒有被阻塞,經過2秒後,它就開始輸出:jone's first thread!!!,而又過了3秒(總共5秒),mike執行緒休眠結束,開始輸出:mike's first thread!!!,這時候主執行緒開始啟動,輸出:main thread is running!!!

(5)主執行緒與子執行緒的優先順序

#主執行緒與新建的執行緒優先順序
import time
import threading

def thread_run1(name):     #mike執行緒的執行函式
    for i in range(100):
        print(i)
    print("%s's first thread!!!"% name)

def thread_run2(name):     #jone執行緒的執行函式
    print("%s's first thread!!!"% name)

mike = threading.Thread(target=thread_run1, args=('Mike', ))
jone = threading.Thread(target=thread_run2, args=('jone', ))

mike.start()
print('main thread is running!!!')
jone.start()
print('main thread is running!!!')

上面這幾行程式碼的執行,是想看一看主執行緒與其他執行緒的優先順序,下面貼上一下(執行了很多遍,每一遍都不一樣)



大致優先順序是,同時併發執行,沒有明確的優先順序(當然可能是有優先順序的,只是我現在還沒有接觸到),當然第二幅圖中兩句話的輸出緊挨著,這是一個偶然現象,之後又試了幾次,兩者並沒有挨著

但是總的來說,一旦執行緒建立,就會立刻執行自己的目標函式,執行順序與主執行緒可以說是同步的

2、執行緒Thread的常用函式

(1)is_alive()函式

用來判斷執行緒是否還在執行,當執行緒建立成功,但是還沒start()開啟時,會返回False;當執行緒已經執行後並結束,也會返回False

import time
import threading

def thread_run(name):
    time.sleep(2)
    print("%s's first thread!!!"% name)

mike = threading.Thread(target=thread_run, args=('Mike', ))
print("mike's status: %s" % mike.isAlive())
mike.start()
print("mike's status: %s" % mike.isAlive())
mike.join()
print("mike's status: %s" % mike.isAlive())
print('main thread is running!!!')

輸出結果:
mike's status: False
mike's status: True
Mike's first thread!!!
mike's status: False
main thread is running!!!

(2)name函式

name屬性表示執行緒的執行緒名,預設是Thread -x ,x是序號,由1開始,第一個建立的執行緒名字就是:Thread-1

import time
import threading

def thread_run(name):
    print("%s's first thread!!!"% name)
    time.sleep(5)

mike = threading.Thread(target=thread_run, args=('Mike', ), name='Th-mike')    #name設定執行緒名
jone = threading.Thread(target=thread_run, args=('jone', ))    #預設執行緒name是Thread-X

mike.start()
jone.start()
print(mike.name)    #列印執行緒名
print(jone.name)    #列印執行緒名

輸出結果:
Mike's first thread!!!
jone's first thread!!!
Th-mike
Thread-1

注意:上面的執行緒名字是可以隨意起的,但是你一旦不寫,就會執行預設執行緒名,另外區分好執行緒名和執行緒的引用名

(3)setName()和getName()

setName()顧名思義,就是設定執行緒的名字;而getName()則是得到執行緒的名字;因為這兩個都比較簡單,就一起寫了

import time
import threading

def thread_run(name):
    print("%s's first thread!!!"% name)
    time.sleep(5)

mike = threading.Thread(target=thread_run, args=('Mike', ))
jone = threading.Thread(target=thread_run, args=('jone', ))    #預設執行緒name是Thread-X

mike.setName('Thread-mike')    #name設定執行緒名

noe = threading.Thread(target=thread_run, args=('noe', ))    #預設執行緒name是Thread-X
mike.start()
jone.start()
print(mike.getName())    #列印執行緒名
print(jone.name)    #列印執行緒名
print(noe.name)    #列印執行緒名

輸出結果:
Mike's first thread!!!
jone's first thread!!!
Thread-mike
Thread-2   #因為mike一開始用的是thread-1,所以jone就只能緊接著變成2了
Thread-3   #這裡其實我有點不太懂了,mike在noe建立之前就已經拋棄了thread-1,但是為什麼noe依然是3呢

好吧,getName()和.name其實是一回事,最後一點疑問有點沒有必要,人家就是這樣的規矩,記住就可以了,畢竟不是什麼內涵東西

(4)daemon()

這個函式還是很有作用的

  • 當daemon = False時,執行緒不會隨主執行緒退出而退出(預設時,就是daemon = False)
  • 當daemon = True時,當主執行緒結束,其他子執行緒就會被強制結束(不管你有沒有執行完,就好像玩遊戲一樣,一點你關閉了總遊戲開關,這個遊戲就徹底 吉吉了)
import time
import threading

def thread_mike_run(name):
    time.sleep(1)
    print('mike thread is running 1S')
    time.sleep(5)
    print("%s's first thread!!!"% name)

def thread_jone_run(name):
    time.sleep(2)
    print("%s's first thread!!!"% name)


mike = threading.Thread(target=thread_mike_run, args=('Mike', ), daemon=True)    #設定daemon為True
jone = threading.Thread(target=thread_jone_run, args=('jone', ))


mike.start()
jone.start()
print('main thread')   

輸出結果:
main thread
mike thread is running 1S
jone's first thread!!!

從上面可以看出,本來mike執行緒應該輸出兩行語句,但是隻輸出了一句,下面我來解釋一下整個流程:主執行緒建立,然後在主執行緒基礎上,又建立了mike和jone兩個執行緒,然後主執行緒輸出了一行程式碼,mike執行緒等待了1秒輸出了一行程式碼,jone執行緒等待了2秒輸出了一行語句,至此主執行緒執行結束(主要是等待jone執行緒執行完,因為jone的daemon設定為false,表示它是由主執行緒代理的,相當於他就是主執行緒的一部分)。

而mike執行緒因為設定daemon為true,所以它不歸主執行緒管了,管你有沒有執行結束(反正你又不歸我管了),我主執行緒要結束了,主執行緒一結束,其他任何執行緒都會強制關機!

(5)setDaemon()

用於設定daemon的值,這裡就不再多說了

mike.setDaemon(True)    #設定mike執行緒的daemon為True

3、執行緒threading常用方法函式

上面說了一堆,其實都是thread的方法函式,thread只是threading模組的一部分,那麼threading模組有哪些函式呢?

這個大神寫的部落格不錯,很詳細,推薦

4、多執行緒的順序執行與併發執行

(1)

from threading import Thread  #匯入執行緒模組
import time   #匯入時間模組

def my_counter():   #定義一個數數函式
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True

def main():    #主函式
    thread_array = {}
    start_time = time.time()    #計算當前時間,單位秒
    for tid in range(2):
        t = Thread(target=my_counter)   #執行緒呼叫數數函式
        t.start()       #執行緒開始執行
        t.join()        #執行緒執行完,阻塞一下,不讓for迴圈結束,直到當前t這個執行緒結束
    end_time = time.time()   #計算結束時間
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()

上面是順序執行,所需要的時間為12秒(大概,運行了幾次大都不一樣),這裡不懂程式碼if _name_ =='_main_'的可以參考部落格:如何簡單地理解Python中的if __name__ == '__main__'

總結來說,就是你匯入的模組中也有main函式,如果你執行main,那麼匯入的模組中main函式也會被執行,這就不是我們想要的結果了,這行判斷的意思是:當.py檔案被直接執行時,if __name__ == '__main__'之下的程式碼塊將被執行;當.py檔案以模組形式被匯入時,if __name__ == '__main__'之下的程式碼塊不被執行。

好了接下來看,兩個執行緒的併發執行:

from threading import Thread  #匯入執行緒模組
import time   #匯入時間模組,用來計算時間

def my_counter():  #還是數數函式
    i = 0
    for _ in range(100000000):
        i = i + 1
    return True

def main():
    thread_array = {}  #定義一個詞典,key與value型別是不限的
    start_time = time.time()   #獲得開始時刻的時間
    for tid in range(2):      #這個for迴圈,是獲得兩個執行緒,並讓其同時開始(是併發,不是並行)
        t = Thread(target=my_counter)
        t.start()
        thread_array[tid] = t  #將這兩個執行緒匯入到字典中,0—第一個執行緒,1—第二個執行緒
    for i in range(2):      #這個for迴圈,是將兩個執行緒關閉
        thread_array[i].join()
    end_time = time.time()   #獲得結束時間
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':  #讓匯入模組中的main函式不執行
    main()

這裡計算結果是13(不過有波動,有時候是12秒多)

三、執行緒池

1、為什麼要用執行緒池

說白了,就是要靈活控制執行緒的數量,一般傳統建立執行緒,是來一個任務建立一個執行緒,但是如果當前時間來了很多工,而這些任務完成時間很多,但是數量非常大,那麼我們其實只需要幾個執行緒就可以完成,比建立大批量的執行緒更好(因為執行緒的建立和使用需要大量的時間,佔用大量的記憶體資源),執行緒池可以固定執行緒的數量

2、執行緒池的實現

四、多程序

1、multiprocessing是跨平臺版本的多程序模組,它提供了一個Process類來代表一個程序物件,下面展示一個示例程式碼:

from multiprocessing import Process   #匯入多程序模組
import time       #匯入時間模組
 
def f(n):     #定義一個平方函式
    time.sleep(1)   #這裡每次之前開始,先停頓1秒
    print (n*n)

if __name__=='__main__':   #禁止匯入的模組下面程式碼執行
    for i in range(10):    #0-9,相當於迴圈10次
        p = Process(target=f,args=[i,])   #每次迴圈都建立一個程序,這個程序的目標是執行上面定義的f函式,傳遞的引數是i(也就是n=i)
        p.start()     #程序開始執行,注意程序執行是併發的
可以看到,如果是單個程序順序執行,執行時間至少是10秒以上(因為每次執行都要停頓1秒),但是10個程序並行執行,只需要1秒多

另外,我發現執行結果每次都不一樣(並不是按照順序輸出的,而是無序),猜測這種並行執行(我也沒有搞清楚是並行,還是併發)

如果在p.start()下面加一行:p.join(),那麼它就是順序執行的了,輸出也是每個1秒輸出1個數值(順序輸出的),這是因為他每次都要等上一個程序執行結束,才會開始下一個程序

2、程序間通訊

Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞

from multiprocessing import Process, Queue  #匯入多程序和佇列
import time  #匯入時間模組


def write(q):  #寫函式
    for i in ['A', 'B', 'C', 'D', 'E']:
        print('Put %s to queue' % i)  #先列印列表中的元素
        q.put(i)    #將元素依次加入佇列中
        time.sleep(0.5)   #停頓0.5秒


def read(q):  #讀函式
    while True:
        v = q.get(True)
        print('get %s from queue' % v)
        if (v == 'E'): break;


if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()  #寫程序開始
    pr.start()  #讀程序開始
    pr.join()   #這裡是阻塞其他程序,要等到讀程序結束以後其他程序才會開始
    pr.terminate()

執行結果:


五、多程序與多執行緒對比

一般情況下,多個程序的記憶體資源是相互獨立的,而多執行緒可以共享同一個程序中的記憶體資源

from multiprocessing import Process
import threading
import time

lock = threading.Lock()  #鎖

def run(info_list, n):
    lock.acquire()
    info_list.append(n)
    lock.release()
    print('%s\n' % info_list)


if __name__ == '__main__':
    info = []
    for i in range(10):
        # target為子程序執行的函式,args為需要給函式傳遞的引數
        p = Process(target=run, args=[info, i])
        p.start()
        p.join()
    time.sleep(1)  # 這裡是為了輸出整齊讓主程序的執行等一下子程序
    print('------------threading--------------')
    for i in range(10):        #這裡是執行緒,上面是程序
        p = threading.Thread(target=run, args=[info, i])
        p.start()
        p.join()
        
輸出結果:
[0]
[1]
[2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
------------threading--------------
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6, 7, 8]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

程序資源不共享,所以每個程序不會在之前程序資源基礎上進行擴充,每個list僅有一個數字,而執行緒則不同,他們共享同一個list,所以每一個執行緒都會在原有基礎上加一個數字