程序、執行緒和協程
一、程序
1、多工原理
多工是指作業系統同時可以執行多個任務。
- 單核CPU實現多工原理:作業系統輪流讓各個任務交替執行;
- 多核CPU實現多工原理:真正的執行多工只能在多核CPU上實現,多出來的任務輪流排程到每個核心上執行。
- 併發:看上去一起執行,任務數多於CPU核心數;
- 並行:真正的一起執行,任務數小於等於CPU核心數。
實現多工的方式:
1、多程序模式
2、多執行緒模式
3、協程模式
4、多程序+多執行緒模式
2、程序
對於作業系統而言,一個任務就是一個程序;
程序是系統中程式執行和資源分配的基本單元,每個程序都有自己的資料段、程式碼段、堆疊段。
下面是一小段程式,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不同的迴圈當中,單任務的執行方式,也就是最初學習時,當一個迴圈沒有結束的時候,無法執行到下面的程式當中。如果想要讓兩個迴圈可以同時在執行,就是在實現多工,當然不是說同時輸出,而是兩個迴圈都在執行著。
1 from time import sleep
2 # 只能執行到那一個迴圈,執行不了run,所以叫單任務
3 def run():
4 while True:
5 print("&&&&&&&&&&&&&&& ")
6 sleep(1.2)
7
8 if __name__ == "__main__":
9 while True:
10 print("**********")
11 sleep(1)
12 run()
接下來啟用多工,通過程序來實現。
multiprocessing庫:跨平臺版本的多程序模組,提供了一個Process類來代表一個程序物件(fork僅適用於Linux)。
下面的程式是在一個父程序中建立一個子程序,讓父程序和子程序可以都在執行,建立方式程式中已經很簡潔了。可以自己把這兩段程式複製下來執行一下,看看輸出的效果。
1 from multiprocessing import Process
2 from time import sleep
3 import os
4
5 def run(str):
6 # os.getpid()獲取當前程序id號
7 # os.getppid()獲取當前程序的父程序id號
8 while True:
9 print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid()))
10 sleep(0.5)
11
12 if __name__ == "__main__":
13 print("主(父)程序啟動 %s" % (os.getpid()))
14 # 建立子程序
15 # target說明程序執行的任務
16 p = Process(target=run, args=("nice",))
17 # 啟動程序
18 p.start()
19
20 while True:
21 print("**********")
22 sleep(1)
我想第一個單任務的程式就不必說了吧,就是一個死迴圈,一直沒有執行到下面的run函式。第二段程式是通過多程序實現的多工,兩個迴圈都能執行到,我把結果截圖放下面,最好自己去試一下。
3、父子程序的先後順序
上面的多程序的例子中輸出了那麼多,我們使用的時候究竟是先執行哪個後執行哪個呢?根據我們的一般思維來說,我們寫的主函式其實就是父程序,在主函式中間,要呼叫的也就是子程序。
1 from multiprocessing import Process
2 from time import sleep
3 import os
4
5 def run():
6 print("啟動子程序")
7 print("子程序結束")
8 sleep(3)
9
10 if __name__ == "__main__":
11 print("父程序啟動")
12 p = Process(target=run)
13 p.start()
14
15 # 父程序的結束不能影響子程序,讓程序等待子程序結束再執行父程序
16 p.join()
17
18 print("父程序結束")
4、全域性變數在多個程序中不能共享
在多程序的程式當中定義的全域性變數在多個程序中是不能共享的,篇幅較長在這裡就不舉例子了,可以自己試一下。這個也是和稍後要說的執行緒的一個區別,線上程中,變數是可以共享的,也因此衍生出一些問題,稍後再說。
5、啟動多個程序
在正常工作使用的時候,當然不止有有個一個兩個程序,畢竟這一兩個也起不到想要的效果。那麼就需要採用更多的程序,這時候需要通過程序池來實現,就是在程序池中放好你要建立的程序,然後執行的時候,把他們都啟動起來,就可以同時進行了,在一定的環境下可以大大的提高效率。當然這個也和起初提到的有關,如果你的CPU是單核的,那麼多程序也只是起到了讓幾個任務同時在執行著,並沒有提高效率,而且啟動程序的時候還要花費一些時間,因此在多核CPU當中更能發揮優勢。
在multiprocessing中有個Pool方法,可以實現程序池。在利用程序池時可以設定要啟動幾個程序,一般情況下,它預設和你電腦的CPU核數一致,也可以自己設定,如果設定的程序數多於CPU核數,那多出來的程序會輪流排程到每個核心上執行。下面是啟動多個程序的過程。
1 from multiprocessing import Pool
2 import os
3 import time
4 import random
5
6
7 def run(name):
8 print("子程序%s啟動--%s" % (name, os.getpid()))
9 start = time.time()
10 time.sleep(random.choice([1,2,3,4,5]))
11 end = time.time()
12 print("子程序%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start))
13
14 if __name__ == "__main__":
15 print("啟動父程序")
16
17 # 建立多個程序
18 # Pool 程序池 :括號裡的數表示可以同時執行的程序數量
19 # Pool()預設大小是CPU核心數
20 pp = Pool(4)
21 for i in range(5):
22 # 建立程序,放入程序池,統一管理
23 pp.apply_async(run, args=(i,))
24
25 # 在呼叫join之前必須先呼叫close,呼叫close之後就不能再繼續新增新的程序了
26 pp.close()
27 # 程序池物件呼叫join還等待程序池中所有的子程序結束
28 pp.join()
29
30 print("結束父程序")
6、檔案拷貝(單程序與多程序對比)
(1)單程序實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現檔案的拷貝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\執行緒、協程' 15 toPath = r'F:\python_note\test' 16 17 # 讀取path下的所有檔案 18 filesList = os.listdir(path) 19 20 # 啟動for迴圈處理每一個檔案 21 start = time.time() 22 for fileName in filesList: 23 copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName)) 24 25 end = time.time() 26 print('總耗時:%.2f' % (end-start))View Code
(2)多程序實現
1 from multiprocessing import Pool 2 import time 3 import os 4 5 # 實現檔案的拷貝 6 def copyFile(rPath, wPath): 7 fr = open(rPath, 'rb') 8 fw = open(wPath, 'wb') 9 context = fr.read() 10 fw.write(context) 11 fr.close() 12 fw.close() 13 14 path = r'F:\python_note\執行緒、協程' 15 toPath = r'F:\python_note\test' 16 17 18 if __name__ == "__main__": 19 # 讀取path下的所有檔案 20 filesList = os.listdir(path) 21 22 start = time.time() 23 pp = Pool(4) 24 for fileName in filesList: 25 pp.apply_async(copyFile, args=(os.path.join( 26 path, fileName), os.path.join(toPath, fileName))) 27 pp.close() 28 pp.join() 29 end = time.time() 30 print("總耗時:%.2f" % (end - start))View Code
上面兩個程式是兩種方法實現同一個目標的程式,可以將其中的檔案路徑更換為你自己的路徑,可以看到最後計算出的耗時是多少。也許有人發現並不是多程序的效率就高,說的的確沒錯,因為建立程序也要花費時間,沒準啟動程序的時間遠多讓這一個核心執行所有核心用的時間要多。這個例子也只是演示一下如何使用,在大資料的任務下會有更深刻的體驗。
7、程序物件
我們知道Python是一個面向物件的語言。而且Python中萬物皆物件,程序也可以封裝成物件,來方便以後自己使用,只要把他封裝的足夠豐富,提供清晰的介面,以後使用時會快捷很多,這個就根據自己的需求自己可以試一下,不寫了。
8、程序間通訊
上面提到過程序間的變數是不能共享的,那麼如果有需要該怎麼辦?通過佇列的方式進行傳遞。在父程序中建立佇列,然後把佇列傳到每個子程序當中,他們就可以共同對其進行操作。
1 from multiprocessing import Process, Queue
2 import os
3 import time
4
5
6 def write(q):
7 print("啟動寫子程序%s" % (os.getpid()))
8 for chr in ['A', 'B', 'C', 'D']:
9 q.put(chr)
10 time.sleep(1)
11 print("結束寫子程序%s" % (os.getpid()))
12
13 def read(q):
14 print("啟動讀子程序%s" % (os.getpid()))
15 while True:
16 value = q.get()
17 print("value = "+value)
18 print("結束讀子程序%s" % (os.getpid()))
19
20 if __name__ == "__main__":
21 # 父程序建立佇列,並傳遞給子程序
22 q = Queue()
23 pw = Process(target=write, args=(q,))
24 pr = Process(target=read, args=(q,))
25
26 pw.start()
27 pr.start()
28 # 寫程序結束
29 pw.join()
30 # pr程序裡是個死迴圈,無法等待期結束,只能強行結束
31 pr.terminate()
32 print("父程序結束")
二、執行緒
1、執行緒
- 在一個程序內部,要同時幹多件事,就需要執行多個"子任務",我們把程序內的多個"子任務"叫做執行緒
- 執行緒通常叫做輕型的程序,執行緒是共享記憶體空間,併發執行的多工,每一個執行緒都共享一個程序的資源
- 執行緒是最小的執行單元而程序由至少一個執行緒組成。如何排程程序和執行緒,完全由作業系統來決定,程式自己不能決定什麼時候執行,執行多長時間
模組:
1、_thread模組 低階模組(更接近底層)
2、threading模組 高階模組,對_thread進行了封裝
2、啟動一個執行緒
同樣,先給一個多執行緒的例子,其中,仍然使用run函式作為其中的一個子執行緒,主函式為父執行緒。通過threading的Thread方法建立執行緒並開啟,join來等待子執行緒。
1 import threading
2 import time
3
4
5 def run():
6 print("子執行緒(%s)啟動" % (threading.current_thread().name))
7
8 # 實現執行緒的功能
9 time.sleep(1)
10 print("列印")
11 time.sleep(2)
12
13 print("子執行緒(%s)結束" % (threading.current_thread().name))
14
15
16 if __name__ == "__main__":
17 # 任何程序都預設會啟動一個執行緒,稱為主執行緒,主執行緒可以啟動新的子執行緒
18 # current_thread():返回執行緒的例項
19 print("主執行緒(%s)啟動" % (threading.current_thread().name))
20
21 # 建立子執行緒
22 t = threading.Thread(target=run, name="runThread")
23 t.start()
24
25 # 等待執行緒結束
26 t.join()
27
28 print("主執行緒(%s)結束" % (threading.current_thread().name))
3、執行緒間資料共享
多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在每個程序中,互不影響。
而多執行緒所有變數都由所有執行緒共享。所以任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時修改一個變數,容易把內容改亂了。
1 import threading
2
3
4 num = 10
5
6 def run(n):
7 global num
8 for i in range(10000000):
9 num = num + n
10 num = num - n
11
12 if __name__ == "__main__":
13 t1 = threading.Thread(target=run, args=(6,))
14 t2 = threading.Thread(target=run, args=(9,))
15
16 t1.start()
17 t2.start()
18 t1.join()
19 t2.join()
20
21 print("num = ",num)
4、執行緒鎖
在第三小點中已經提到了,多執行緒的一個缺點就是資料是共享的,如果有兩個執行緒正同時在修改這個資料,就會出現混亂,它自己也不知道該聽誰的了,尤其是在運算比較複雜,次數較多的時候,這種錯誤的機會會更大。
當然,解決辦法也是有的,那就是利用執行緒鎖。加鎖的意思就是在其中一個執行緒正在對資料進行操作時,讓其他執行緒不得介入。這個加鎖和釋放鎖是由人來確定的。
- 確保了這段程式碼只能由一個執行緒從頭到尾的完整執行
- 阻止了多執行緒的併發執行,要比不加鎖時候效率低。包含鎖的程式碼段只能以單執行緒模式執行
- 由於可以存在多個鎖,不同執行緒持有不同的鎖,並試圖獲取其他的鎖,可能造成死鎖導致多個執行緒掛起,只能靠作業系統強制終止
1 def run(n):
2 global num
3 for i in range(10000000):
4 lock.acquire()
5 try:
6 num = num + n
7 num = num - n
8 finally:
9 # 修改完釋放鎖
10 lock.release()
11
12 if __name__ == "__main__":
13 t1 = threading.Thread(target=run, args=(6,))
14 t2 = threading.Thread(target=run, args=(9,))
15
16 t1.start()
17 t2.start()
18 t1.join()
19 t2.join()
20
21 print("num = ",num)
上面這段程式是迴圈多次num+n-n+n-n的過程,變數n分別設為6和9是在兩個不同的執行緒當中,程式中已經加了鎖,你可以先去掉試一下,當迴圈次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。
加鎖是在迴圈體當中,依次執行加減法,定義中說到確保一個執行緒從頭到尾的完整執行,也就是在計算途中,不會有其他的執行緒打擾。你可以想一下,如果一個執行緒執行完加法,正在執行減法,另一個執行緒進來了,它要先進行加法時的初始sum值該是多少呢,執行緒二不一定線上程一的什麼時候進來,萬一剛進來時候,執行緒一恰好給sum賦值了,而執行緒二仍然用的是正準備進來時候的sum值,那從這裡開始豈不已經分道揚鑣了。所以,運算的次數越多,結果會越離譜。
這個說完了,還有一個小小的改進。你是否記得讀寫檔案時候書寫的一種簡便形式,通過with來實現,可以避免我們忘記關閉檔案,自動幫我們關閉。當然還有一些其他地方也用到了這個方法。這裡也同樣適用。
1 # 與上面程式碼功能相同,with lock可以自動上鎖與解鎖
2 with lock:
3 num = num + n
4 num = num - n
5、ThreadLocal
- 建立一個全域性的ThreadLocal物件
- 每個執行緒有獨立的儲存空間
- 每個執行緒對ThreadLocal物件都可以讀寫,但是互不影響
根據名字也可以看出,也就是在本地建個連線,所有的操作在本地進行,每個執行緒之間沒有資料的影響。
1 import threading
2
3
4 num = 0
5 local = threading.local()
6
7 def run(x, n):
8 x = x + n
9 x = x - n
10
11 def func(n):
12 # 每個執行緒都有local.x
13 local.x = num
14 for i in range(10000000):
15 run(local.x, n)
16 print("%s-%d" % (threading.current_thread().name, local.x))
17
18
19 if __name__ == "__main__":
20 t1 = threading.Thread(target=func, args=(6,))
21 t2 = threading.Thread(target=func, args=(9,))
22
23 t1.start()
24 t2.start()
25 t1.join()
26 t2.join()
27
28 print("num = ",num)
6、控制執行緒數量
1 '''
2 控制執行緒數量是指控制執行緒同時觸發的數量,可以拿下來這段程式碼執行一下,下面啟動了5個執行緒,但是他們會兩個兩個的進行
3 '''
4 import threading
5 import time
6
7 # 控制併發執行執行緒的數量
8 sem = threading.Semaphore(2)
9
10 def run():
11 with sem:
12 for i in range(10):
13 print("%s---%d" % (threading.current_thread().name, i))
14 time.sleep(1)
15
16
17 if __name__ == "__main__":
18 for i in range(5):
19 threading.Thread(target=run).start()
上面的程式是有多個執行緒,但是每次限制同時執行的執行緒,通俗點說就是限制併發執行緒的上限;除此之外,也可以限制執行緒數量的下限,也就是至少達到多少個執行緒才能觸發。
1 import threading
2 import time
3
4
5 # 湊夠一定數量的執行緒才會執行,否則一直等著
6 bar = threading.Barrier(4)
7
8 def run():
9 print("%s--start" % (threading.current_thread().name))
10 time.sleep(1)
11 bar.wait()
12 print("%s--end" % (threading.current_thread().name))
13
14
15 if __name__ == "__main__":
16 for i in range(5):
17 threading.Thread(target=run).start()
7、定時執行緒
1 import threading
2
3
4 def run():
5 print("***********************")
6
7 # 延時執行執行緒
8 t = threading.Timer(5, run)
9 t.start()
10
11 t.join()
12 print("父執行緒結束")
8、執行緒通訊
1 import threading
2 import time
3
4
5 def func():
6 # 事件物件
7 event = threading.Event()
8 def run():
9 for i in range(5):
10 # 阻塞,等待事件的觸發
11 event.wait()
12 # 重置阻塞,使後面繼續阻塞
13 event.clear()
14 print("**************")
15 t = threading.Thread(target=run).start()
16 return event
17
18 e = func()
19
20 # 觸發事件
21 for i in range(5):
22 time.sleep(2)
23 e.set()
9、一個小栗子
這個例子是用了生產者和消費者來模擬,要進行資料通訊,還引入了佇列。先來理解一下。
1 import threading
2 import queue
3 import time
4 import random
5
6
7 # 生產者
8 def product(id, q):
9 while True:
10 num = random.randint(0, 10000)
11 q.put(num)
12 print("生產者%d生產了%d資料放入了佇列" % (id, num))
13 time.sleep(3)
14 # 任務完成
15 q.task_done()
16
17 # 消費者
18 def customer(id, q):
19 while True:
20 item = q.get()
21 if item is None:
22 break
23 print("消費者%d消費了%d資料" % (id, item))
24 time.sleep(2)
25 # 任務完成
26 q.task_done()
27
28
29 if __name__ == "__main__":
30 # 訊息佇列
31 q = queue.Queue()
32
33 # 啟動生產者
34 for i in range(4):
35 threading.Thread(target=product, args=(i, q)).start()
36
37 # 啟動消費者
38 for i in range(3):
39 threading.Thread(target=customer, args=(i, q)).start()
10、執行緒排程
1 import threading
2 import time
3
4
5 # 執行緒條件變數
6 cond = threading.Condition()
7
8
9 def run():
10 with cond:
11 for i in range(0, 10, 2):
12 print(threading.current_thread().name, i)
13 time.sleep(1)
14 cond.wait() # 阻塞
15 cond.notify() # 告訴另一個執行緒可以執行
16
17
18 def run2():
19 with cond:
20 for i in range(1, 10, 2):
21 print(threading.current_thread().name, i)
22 time.sleep(1)
23 cond.notify()
24 cond.wait()
25
26
27 threading.Thread(target=run).start()
28 threading.Thread(target=run2).start()
三、協程
1、協程
- 子程式/子函式:在所有語言中都是層級呼叫,比如A呼叫B,在B執行的工程中又可以呼叫C,C執行完畢返回,B執行完畢返回最後是A執行完畢。是通過棧實現的,一個執行緒就是一個子程式,子程式呼叫總是一個入口,一次返回,呼叫的順序是明確的
- 協程:看上去也是子程式,但執行過程中,在子程式的內部可中斷,然後轉而執行別的子程式,不是函式呼叫,有點類似CPU中斷
1 # 這是一個子程式的呼叫
2 def C():
3 print("C--start")
4 print("C--end")
5
6 def B():
7 print("B--start")
8 C()
9 print("B--end")
10
11 def A():
12 print("A--start")
13 B()
14 print("A--end")
15
16 A()
- 協程與子程式呼叫的結果類似,但不是通過在函式中呼叫另一個函式
- 協程執行起來有點像執行緒,但協程的特點在於是一個執行緒
- 與執行緒相比的優點:協程的執行效率極高,因為只有一個執行緒,也不存在同時寫變數的衝突,在協程中共享資源不加鎖,只需要判斷狀態
2、協程的原理
1 # python對協程的支援是通過generator實現的
2 def run():
3 print(1)
4 yield 10
5 print(2)
6 yield 20
7 print(3)
8 yield 30
9
10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換
11 # 返回值是一個生成器
12 m = run()
13 print(next(m))
14 print(next(m))
15 print(next(m))
3、資料傳輸
1 # python對協程的支援是通過generator實現的
2 def run():
3 print(1)
4 yield 10
5 print(2)
6 yield 20
7 print(3)
8 yield 30
9
10 # 協程的最簡單風格,控制函式的階段執行,節約執行緒或者程序的切換
11 # 返回值是一個生成器
12 m = run()
13 print(next(m))
14 print(next(m))
15 print(next(m))
4、小栗子
1 def product(c):
2 c.send(None)
3 for i in range(5):
4 print("生產者產生資料%d" % (i))
5 r = c.send(str(i))
6 print("消費者消費了資料%s" % (r))
7 c.close()
8
9
10 def customer():
11 data = ""
12 while True:
13 n = yield data
14 if not n:
15 return
16 print("消費者消費了%s" % (n))
17 data = "200"
18
19
20 c = customer()
21 product(c)