1. 程式人生 > >程序、執行緒和協程

程序、執行緒和協程

一、程序

1、多工原理

  多工是指作業系統同時可以執行多個任務。

  • 單核CPU實現多工原理:作業系統輪流讓各個任務交替執行;
  • 多核CPU實現多工原理:真正的執行多工只能在多核CPU上實現,多出來的任務輪流排程到每個核心上執行。
  • 併發:看上去一起執行,任務數多於CPU核心數;
  • 並行:真正的一起執行,任務數小於等於CPU核心數。

  實現多工的方式:
    1、多程序模式
    2、多執行緒模式
    3、協程模式
    4、多程序+多執行緒模式

2、程序

  對於作業系統而言,一個任務就是一個程序;

  程序是系統中程式執行和資源分配的基本單元,每個程序都有自己的資料段、程式碼段、堆疊段。


 

  下面是一小段程式,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不同的迴圈當中,單任務的執行方式,也就是最初學習時,當一個迴圈沒有結束的時候,無法執行到下面的程式當中。如果想要讓兩個迴圈可以同時在執行,就是在實現多工,當然不是說同時輸出,而是兩個迴圈都在執行著。

 1 from time import sleep
 2 # 只能執行到那一個迴圈,執行不了run,所以叫單任務
 3 def run():
 4     while True:
 5         print("&&&&&&&&&&&&&&&
") 6 sleep(1.2) 7 8 if __name__ == "__main__": 9 while True: 10 print("**********") 11 sleep(1) 12 run()

  接下來啟用多工,通過程序來實現。

  multiprocessing庫:跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件(fork僅適用於Linux)。

  下面的程式是在一個父程序中建立一個子程序,讓父程序和子程序可以都在執行,建立方式程式中已經很簡潔了。可以自己把這兩段程式複製下來執行一下,看看輸出的效果。

 1 from multiprocessing import Process
 2 from time import sleep
 3 import os
 4 
 5 def run(str):
 6     # os.getpid()獲取當前程序id號
 7     # os.getppid()獲取當前程序的父程序id號
 8     while True:
 9         print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid()))
10         sleep(0.5)
11 
12 if __name__ == "__main__":
13     print("主(父)程序啟動 %s" % (os.getpid()))
14     # 建立子程序
15     # target說明程序執行的任務
16     p = Process(target=run, args=("nice",))
17     # 啟動程序
18     p.start()
19 
20     while True:
21         print("**********")
22         sleep(1)

  我想第一個單任務的程式就不必說了吧,就是一個死迴圈,一直沒有執行到下面的run函式。第二段程式是通過多程序實現的多工,兩個迴圈都能執行到,我把結果截圖放下面,最好自己去試一下。

3、父子程序的先後順序

  上面的多程序的例子中輸出了那麼多,我們使用的時候究竟是先執行哪個後執行哪個呢?根據我們的一般思維來說,我們寫的主函式其實就是父程序,在主函式中間,要呼叫的也就是子程序。

 

 1 from multiprocessing import Process
 2 from time import sleep
 3 import os
 4 
 5 def run():
 6     print("啟動子程序")
 7     print("子程序結束")
 8     sleep(3)
 9 
10 if __name__ == "__main__":
11     print("父程序啟動")
12     p = Process(target=run)
13     p.start()
14 
15     # 父程序的結束不能影響子程序,讓程序等待子程序結束再執行父程序
16     p.join()
17 
18     print("父程序結束")

4、全域性變數在多個程序中不能共享 

  在多程序的程式當中定義的全域性變數在多個程序中是不能共享的,篇幅較長在這裡就不舉例子了,可以自己試一下。這個也是和稍後要說的執行緒的一個區別,線上程中,變數是可以共享的,也因此衍生出一些問題,稍後再說。

5、啟動多個程序 

  在正常工作使用的時候,當然不止有有個一個兩個程序,畢竟這一兩個也起不到想要的效果。那麼就需要採用更多的程序,這時候需要通過程序池來實現,就是在程序池中放好你要建立的程序,然後執行的時候,把他們都啟動起來,就可以同時進行了,在一定的環境下可以大大的提高效率。當然這個也和起初提到的有關,如果你的CPU是單核的,那麼多程序也只是起到了讓幾個任務同時在執行著,並沒有提高效率,而且啟動程序的時候還要花費一些時間,因此在多核CPU當中更能發揮優勢。

  在multiprocessing中有個Pool方法,可以實現程序池。在利用程序池時可以設定要啟動幾個程序,一般情況下,它預設和你電腦的CPU核數一致,也可以自己設定,如果設定的程序數多於CPU核數,那多出來的程序會輪流排程到每個核心上執行。下面是啟動多個程序的過程。

 1 from multiprocessing import Pool
 2 import os
 3 import time
 4 import random
 5 
 6 
 7 def run(name):
 8     print("子程序%s啟動--%s" % (name, os.getpid()))
 9     start = time.time()
10     time.sleep(random.choice([1,2,3,4,5]))
11     end = time.time()
12     print("子程序%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start))
13 
14 if __name__ == "__main__":
15     print("啟動父程序")
16 
17     # 建立多個程序
18     # Pool 程序池 :括號裡的數表示可以同時執行的程序數量
19     # Pool()預設大小是CPU核心數
20     pp = Pool(4)
21     for i in range(5):
22         # 建立程序,放入程序池,統一管理
23         pp.apply_async(run, args=(i,))
24 
25     # 在呼叫join之前必須先呼叫close,呼叫close之後就不能再繼續新增新的程序了
26     pp.close()
27     # 程序池物件呼叫join還等待程序池中所有的子程序結束
28     pp.join()
29 
30     print("結束父程序")

6、檔案拷貝(單程序與多程序對比)

(1)單程序實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現檔案的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\執行緒、協程'
15 toPath = r'F:\python_note\test'
16 
17 # 讀取path下的所有檔案
18 filesList = os.listdir(path)
19 
20 # 啟動for迴圈處理每一個檔案
21 start = time.time()
22 for fileName in filesList:
23     copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
24 
25 end = time.time()
26 print('總耗時:%.2f' % (end-start))
View Code

(2)多程序實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現檔案的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\執行緒、協程'
15 toPath = r'F:\python_note\test'
16 
17 
18 if __name__ == "__main__":
19     # 讀取path下的所有檔案
20     filesList = os.listdir(path)
21 
22     start = time.time()
23     pp = Pool(4)
24     for fileName in filesList:
25         pp.apply_async(copyFile, args=(os.path.join(
26             path, fileName), os.path.join(toPath, fileName)))
27     pp.close()
28     pp.join()
29     end = time.time()
30     print("總耗時:%.2f" % (end - start))
View Code

  上面兩個程式是兩種方法實現同一個目標的程式,可以將其中的檔案路徑更換為你自己的路徑,可以看到最後計算出的耗時是多少。也許有人發現並不是多程序的效率就高,說的的確沒錯,因為建立程序也要花費時間,沒準啟動程序的時間遠多讓這一個核心執行所有核心用的時間要多。這個例子也只是演示一下如何使用,在大資料的任務下會有更深刻的體驗。

 7、程序物件

  我們知道Python是一個面向物件的語言。而且Python中萬物皆物件,程序也可以封裝成物件,來方便以後自己使用,只要把他封裝的足夠豐富,提供清晰的介面,以後使用時會快捷很多,這個就根據自己的需求自己可以試一下,不寫了。

 8、程序間通訊

  上面提到過程序間的變數是不能共享的,那麼如果有需要該怎麼辦?通過佇列的方式進行傳遞。在父程序中建立佇列,然後把佇列傳到每個子程序當中,他們就可以共同對其進行操作。 

 1 from multiprocessing import Process, Queue
 2 import os
 3 import time
 4 
 5 
 6 def write(q):
 7     print("啟動寫子程序%s" % (os.getpid()))
 8     for chr in ['A', 'B', 'C', 'D']:
 9         q.put(chr)
10         time.sleep(1)
11     print("結束寫子程序%s" % (os.getpid()))
12 
13 def read(q):
14     print("啟動讀子程序%s" % (os.getpid()))
15     while True:
16         value = q.get()
17         print("value = "+value)
18     print("結束讀子程序%s" % (os.getpid()))
19 
20 if __name__ == "__main__":
21     # 父程序建立佇列,並傳遞給子程序
22     q = Queue()
23     pw = Process(target=write, args=(q,))
24     pr = Process(target=read, args=(q,))
25 
26     pw.start()
27     pr.start()
28     # 寫程序結束
29     pw.join()
30     # pr程序裡是個死迴圈,無法等待期結束,只能強行結束
31     pr.terminate()
32     print("父程序結束")

 二、執行緒

1、執行緒

  • 在一個程序內部,要同時幹多件事,就需要執行多個"子任務",我們把程序內的多個"子任務"叫做執行緒
  • 執行緒通常叫做輕型的程序,執行緒是共享記憶體空間,併發執行的多工,每一個執行緒都共享一個程序的資源
  • 執行緒是最小的執行單元而程序由至少一個執行緒組成。如何排程程序和執行緒,完全由作業系統來決定,程式自己不能決定什麼時候執行,執行多長時間

模組:

1、_thread模組 低階模組(更接近底層)

2、threading模組 高階模組,對_thread進行了封裝

2、啟動一個執行緒

  同樣,先給一個多執行緒的例子,其中,仍然使用run函式作為其中的一個子執行緒,主函式為父執行緒。通過threading的Thread方法建立執行緒並開啟,join來等待子執行緒。

 1 import threading
 2 import time
 3 
 4 
 5 def run():
 6     print("子執行緒(%s)啟動" % (threading.current_thread().name))
 7 
 8     # 實現執行緒的功能
 9     time.sleep(1)
10     print("列印")
11     time.sleep(2)
12 
13     print("子執行緒(%s)結束" % (threading.current_thread().name))
14 
15 
16 if __name__ == "__main__":
17     # 任何程序都預設會啟動一個執行緒,稱為主執行緒,主執行緒可以啟動新的子執行緒
18     # current_thread():返回執行緒的例項
19     print("主執行緒(%s)啟動" % (threading.current_thread().name))
20 
21     # 建立子執行緒
22     t = threading.Thread(target=run, name="runThread")
23     t.start()
24 
25     # 等待執行緒結束
26     t.join()
27 
28     print("主執行緒(%s)結束" % (threading.current_thread().name))

3、執行緒間資料共享

  多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在每個程序中,互不影響。

  而多執行緒所有變數都由所有執行緒共享。所以任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時修改一個變數,容易把內容改亂了。

 1 import threading
 2 
 3 
 4 num = 10
 5 
 6 def run(n):
 7     global num
 8     for i in range(10000000):
 9         num = num + n
10         num = num - n
11 
12 if __name__ == "__main__":
13     t1 = threading.Thread(target=run, args=(6,))
14     t2 = threading.Thread(target=run, args=(9,))
15 
16     t1.start()
17     t2.start()
18     t1.join()
19     t2.join()
20 
21     print("num = ",num)

4、執行緒鎖

  在第三小點中已經提到了,多執行緒的一個缺點就是資料是共享的,如果有兩個執行緒正同時在修改這個資料,就會出現混亂,它自己也不知道該聽誰的了,尤其是在運算比較複雜,次數較多的時候,這種錯誤的機會會更大。

  當然,解決辦法也是有的,那就是利用執行緒鎖。加鎖的意思就是在其中一個執行緒正在對資料進行操作時,讓其他執行緒不得介入。這個加鎖和釋放鎖是由人來確定的。

  • 確保了這段程式碼只能由一個執行緒從頭到尾的完整執行
  • 阻止了多執行緒的併發執行,要比不加鎖時候效率低。包含鎖的程式碼段只能以單執行緒模式執行
  • 由於可以存在多個鎖,不同執行緒持有不同的鎖,並試圖獲取其他的鎖,可能造成死鎖導致多個執行緒掛起,只能靠作業系統強制終止
 1 def run(n):
 2     global num
 3     for i in range(10000000):    
 4         lock.acquire()
 5         try:
 6             num = num + n
 7             num = num - n
 8         finally:
 9             # 修改完釋放鎖
10             lock.release()
11 
12 if __name__ == "__main__":
13     t1 = threading.Thread(target=run, args=(6,))
14     t2 = threading.Thread(target=run, args=(9,))
15 
16     t1.start()
17     t2.start()
18     t1.join()
19     t2.join()
20 
21     print("num = ",num)

  上面這段程式是迴圈多次num+n-n+n-n的過程,變數n分別設為6和9是在兩個不同的執行緒當中,程式中已經加了鎖,你可以先去掉試一下,當迴圈次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。

  加鎖是在迴圈體當中,依次執行加減法,定義中說到確保一個執行緒從頭到尾的完整執行,也就是在計算途中,不會有其他的執行緒打擾。你可以想一下,如果一個執行緒執行完加法,正在執行減法,另一個執行緒進來了,它要先進行加法時的初始sum值該是多少呢,執行緒二不一定線上程一的什麼時候進來,萬一剛進來時候,執行緒一恰好給sum賦值了,而執行緒二仍然用的是正準備進來時候的sum值,那從這裡開始豈不已經分道揚鑣了。所以,運算的次數越多,結果會越離譜。

  這個說完了,還有一個小小的改進。你是否記得讀寫檔案時候書寫的一種簡便形式,通過with來實現,可以避免我們忘記關閉檔案,自動幫我們關閉。當然還有一些其他地方也用到了這個方法。這裡也同樣適用。

1 # 與上面程式碼功能相同,with lock可以自動上鎖與解鎖
2 with lock:
3     num = num + n
4     num = num - n

5、ThreadLocal

  • 建立一個全域性的ThreadLocal物件
  • 每個執行緒有獨立的儲存空間
  • 每個執行緒對ThreadLocal物件都可以讀寫,但是互不影響

  根據名字也可以看出,也就是在本地建個連線,所有的操作在本地進行,每個執行緒之間沒有資料的影響。

 1 import threading
 2 
 3 
 4 num = 0
 5 local = threading.local()
 6 
 7 def run(x, n):
 8     x = x + n
 9     x = x - n
10 
11 def func(n):
12     # 每個執行緒都有local.x
13     local.x = num
14     for i in range(10000000):
15         run(local.x, n)
16     print("%s-%d" % (threading.current_thread().name, local.x))
17 
18 
19 if __name__ == "__main__":
20     t1 = threading.Thread(target=func, args=(6,))
21     t2 = threading.Thread(target=func, args=(9,))
22 
23     t1.start()
24     t2.start()
25     t1.join()
26     t2.join()
27 
28     print("num = ",num)

6、控制執行緒數量

 1 '''
 2 控制執行緒數量是指控制執行緒同時觸發的數量,可以拿下來這段程式碼執行一下,下面啟動了5個執行緒,但是他們會兩個兩個的進行
 3 '''
 4 import threading
 5 import time
 6 
 7 # 控制併發執行執行緒的數量
 8 sem = threading.Semaphore(2)
 9 
10 def run():
11     with sem:
12         for i in range(10):
13             print("%s---%d" % (threading.current_thread().name, i))
14             time.sleep(1)
15 
16 
17 if __name__ == "__main__":
18     for i in range(5):
19         threading.Thread(target=run).start()

  上面的程式是有多個執行緒,但是每次限制同時執行的執行緒,通俗點說就是限制併發執行緒的上限;除此之外,也可以限制執行緒數量的下限,也就是至少達到多少個執行緒才能觸發。

 1 import threading
 2 import time
 3 
 4 
 5 # 湊夠一定數量的執行緒才會執行,否則一直等著
 6 bar = threading.Barrier(4)
 7 
 8 def run():
 9     print("%s--start" % (threading.current_thread().name))
10     time.sleep(1)
11     bar.wait()
12     print("%s--end" % (threading.current_thread().name))
13 
14 
15 if __name__ == "__main__":
16     for i in range(5):
17         threading.Thread(target=run).start()

7、定時執行緒

 1 import threading
 2 
 3 
 4 def run():
 5     print("***********************")
 6 
 7 # 延時執行執行緒
 8 t = threading.Timer(5, run)
 9 t.start()
10 
11 t.join()
12 print("父執行緒結束")

8、執行緒通訊

 1 import threading
 2 import time
 3 
 4 
 5 def func():
 6     # 事件物件
 7     event = threading.Event()
 8     def run():
 9         for i in range(5):
10             # 阻塞,等待事件的觸發
11             event.wait()
12             # 重置阻塞,使後面繼續阻塞
13             event.clear()
14             print("**************")
15     t = threading.Thread(target=run).start()
16     return event
17 
18 e = func()
19 
20 # 觸發事件
21 for i in range(5):
22     time.sleep(2)
23     e.set()

9、一個小栗子

  這個例子是用了生產者和消費者來模擬,要進行資料通訊,還引入了佇列。先來理解一下。

 1 import threading
 2 import queue
 3 import time
 4 import random
 5 
 6 
 7 # 生產者
 8 def product(id, q):
 9     while True:
10         num = random.randint(0, 10000)
11         q.put(num)
12         print("生產者%d生產了%d資料放入了佇列" % (id, num))
13         time.sleep(3)
14     # 任務完成
15     q.task_done()
16 
17 # 消費者
18 def customer(id, q):
19     while True:
20         item = q.get()
21         if item is None:
22             break
23         print("消費者%d消費了%d資料" % (id, item))
24         time.sleep(2)
25     # 任務完成
26     q.task_done()
27 
28 
29 if __name__ == "__main__":
30     # 訊息佇列
31     q = queue.Queue()
32 
33     # 啟動生產者
34     for i in range(4):
35         threading.Thread(target=product, args=(i, q)).start()
36 
37     # 啟動消費者
38     for i in range(3):
39         threading.Thread(target=customer, args=(i, q)).start()

10、執行緒排程

 1 import threading
 2 import time
 3 
 4 
 5 # 執行緒條件變數
 6 cond = threading.Condition()
 7 
 8 
 9 def run():
10     with cond:
11         for i in range(0, 10, 2):
12             print(threading.current_thread().name, i)
13             time.sleep(1)
14             cond.wait()  # 阻塞
15             cond.notify()  # 告訴另一個執行緒可以執行
16 
17 
18 def run2():
19     with cond:
20         for i in range(1, 10, 2):
21             print(threading.current_thread().name, i)
22             time.sleep(1)
23             cond.notify()
24             cond.wait()
25 
26 
27 threading.Thread(target=run).start()
28 threading.Thread(target=run2).start()

三、協程

1、協程

  • 子程式/子函式:在所有語言中都是層級呼叫,比如A呼叫B,在B執行的工程中又可以呼叫C,C執行完畢返回,B執行完畢返回最後是A執行完畢。是通過棧實現的,一個執行緒就是一個子程式,子程式呼叫總是一個入口,一次返回,呼叫的順序是明確的
  • 協程:看上去也是子程式,但執行過程中,在子程式的內部可中斷,然後轉而執行別的子程式,不是函式呼叫,有點類似CPU中斷
 1 # 這是一個子程式的呼叫
 2 def C():
 3     print("C--start")
 4     print("C--end")
 5 
 6 def B():
 7     print("B--start")
 8     C()
 9     print("B--end")
10 
11 def A():
12     print("A--start")
13     B()
14     print("A--end")
15 
16 A()
  • 協程與子程式呼叫的結果類似,但不是通過在函式中呼叫另一個函式
  • 協程執行起來有點像執行緒,但協程的特點在於是一個執行緒
  • 與執行緒相比的優點:協程的執行效率極高,因為只有一個執行緒,也不存在同時寫變數的衝突,在協程中共享資源不加鎖,只需要判斷狀態

2、協程的原理

 1 # python對協程的支援是通過generator實現的
 2 def run():
 3     print(1)
 4     yield 10
 5     print(2)
 6     yield 20
 7     print(3)
 8     yield 30
 9 
10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換
11 # 返回值是一個生成器
12 m = run()
13 print(next(m))
14 print(next(m))
15 print(next(m))

3、資料傳輸

 1 # python對協程的支援是通過generator實現的
 2 def run():
 3     print(1)
 4     yield 10
 5     print(2)
 6     yield 20
 7     print(3)
 8     yield 30
 9 
10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換
11 # 返回值是一個生成器
12 m = run()
13 print(next(m))
14 print(next(m))
15 print(next(m))

4、小栗子

 1 def product(c):
 2     c.send(None)
 3     for i in range(5):
 4         print("生產者產生資料%d" % (i))
 5         r = c.send(str(i))
 6         print("消費者消費了資料%s" % (r))
 7     c.close()
 8 
 9 
10 def customer():
11     data = ""
12     while True:
13         n = yield data
14         if not n:
15             return
16         print("消費者消費了%s" % (n))
17         data = "200"
18 
19 
20 c = customer()
21 product(c)