同步非同步與協成
阿新 • • 發佈:2018-11-13
同步|非同步:
執行緒的三種狀態:
1.就緒
2.執行
3.阻塞
阻塞和非阻塞描述的是執行的狀態
阻塞 :遇到了IO操作,程式碼卡住,無法執行下一行,CPU會切換到其他任務
非阻塞 :與阻塞相反,程式碼正在執行(執行狀態) 或處於就緒狀態
同步和非同步指的是提交任務的方式
同步 :提交任務必須等待任務完成,才能執行下一行
非同步 :提交任務不需要等待任務完成,立即執行下一行
程式碼:
1 def task(): 2 for i in range(1000000): 3 i += 1000 4 print("11111") 5 6 print("start") 7 task() # 同步提交方式,等函式執行完菜執行下一行 8 print("end") 9 10 from threading import Thread 11 12 print("start1") 13 Thread(target=task).start() # 非同步提交,開啟執行緒,然後去執行之後的程式碼,執行緒內程式碼自行執行 14 print("end1")
非同步回撥:任務執行結束後自動呼叫某個函式
為什麼需要回調? 子程序幫助主程序完成任務,處理任務的結果應該交還給主程序 其他方式也可以將資料交還給主程序 1.shutdown 主程序會等到所有任務完成2.result函式 會阻塞直到任務完成 都會阻塞,導致效率降低,所以使用回撥 注意: 回撥函式什麼時候被執行? 子程序任務完成時 誰在執行回撥函式? 主程序 執行緒的非同步回撥 使用方式都相同,唯一的不同是執行回撥函式,是子執行緒在執行(執行緒間資料共享)
三種方式:
1 # 方式1 自己來儲存資料 並執行shutdown 僅在多執行緒 2 3 res = [] 4 def task(): 5 print("%s is 正在打水" % os.getpid()) 6 time.sleep(0.2) 7 w = "%s 打的水" % os.getpid() 8 res.append(w) 9 return w 10 11 if __name__ == '__main__': 12 for i in range(20): 13 # 提交任務會返回一個物件 用於回去執行狀態和結果 14 f = pool.submit(task) 15 print(f.result()) # 方式2 執行result 它是阻塞的直到任務完成 又變成串行了 16 17 print("11111") 18 # pool.shutdown() # 首先不允許提交新任務 然後等目前所有任務完成後 19 # print(res) 20 print("over") 21 22 ==================================================================================== 23 24 pool = ThreadPoolExecutor() 25 26 # 方式3 通過回撥(什麼是回撥 任務執行結束後自動呼叫某個函式) 27 def task(): 28 print("%s is 正在打水" % os.getpid()) 29 # time.sleep(0.2) 30 w = "%s 打的水" % os.getpid() 31 return w 32 33 def task_finish(res): 34 print("打水完成! %s" % res) 35 36 if __name__ == '__main__': 37 for i in range(20): 38 # 提交任務會返回一個物件 用於回去執行狀態和結果 39 f = pool.submit(task) 40 f.add_done_callback(task_finish) #新增完成後的回撥 41 print("11111") 42 print("over")
利用回撥完成生產者消費者:
多程序:
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 from threading import current_thread 3 import os 4 # 程序池 5 pool = ProcessPoolExecutor() 6 # 爬蟲:從網路某個地址獲取一個HTML檔案 7 import requests # 該模組用於網路(HTTP)請求 8 9 # 生產資料,即生產者 10 def get_data_task(url): 11 print(os.getpid(),"正在生產資料!") 12 # print(current_thread(),"正在生產資料!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理資料,即消費者 20 def parser_data(f): 21 print(os.getpid(),"處理資料") 22 # print(current_thread(), "處理資料") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回撥函式是主程序在執行 36 # 因為子程序是負責獲取資料的,然而資料怎麼處理 ,子程序並不知道.應該把資料還給主程序 37 print("over")
多執行緒:
1 from concurrent.futures import ThreadPoolExecutor 2 from threading import current_thread 3 # 程序池 4 pool = ThreadPoolExecutor() 5 6 # 爬蟲:從網路某個地址獲取一個HTML檔案 7 import requests # 該模組用於網路(HTTP)請求 8 9 # 生產資料 10 def get_data_task(url): 11 # print(os.getpid(),"正在生產資料!") 12 print(current_thread(),"正在生產資料!") 13 14 response = requests.get(url) 15 text = response.content.decode("utf-8") 16 print(text) 17 return text 18 19 # 處理資料 20 def parser_data(f): 21 # print(os.getpid(),"處理資料") 22 print(current_thread(), "處理資料") 23 print("正在解析: 長度%s" % len(f.result())) 24 25 urls = [ 26 "http://www.baidu.com", 27 "http://www.baidu.com", 28 "http://www.baidu.com", 29 "http://www.baidu.com" 30 ] 31 32 if __name__ == '__main__': 33 for url in urls: 34 f = pool.submit(get_data_task,url) 35 f.add_done_callback(parser_data) # 回撥函式是主程序在執行 36 # 因為子程序是負責獲取資料的 然而資料怎麼處理 子程序並不知道 應該把資料還給主程序 37 print("over")
執行緒佇列:
普通佇列/堆疊佇列/優先順序佇列:
import queue # 普通佇列 先進先出 q = queue.Queue() q.put("a") q.put("b") print(q.get()) print(q.get()) # 堆疊佇列 先進後出 後進先出 函式呼叫就是進棧 函式結束就出棧 遞迴造成棧溢位 q2 = queue.LifoQueue() q2.put("a") q2.put("b") print(q2.get()) # 優先順序佇列 q3 = queue.PriorityQueue() # 數值越小優先順序越高 優先順序相同時 比較大小 小的先取 q3.put((-100, "c")) q3.put((1, "a")) q3.put((100, "b")) print(q3.get())
協成:在單執行緒實現併發
協程的目的是在單執行緒下實現併發 為什麼出現協程? 因為cpython中,由於GIL而導致同一時間只有一個執行緒在跑 意味著:如果你的程式時計算密集,多執行緒效率也不會提升 如果是io密集型 沒有必要在單執行緒下實現併發,我會開啟多執行緒來處理io,子線遇到io,cpu切走. 不能保證一定切到主線 如果可以,我在遇到io的時候轉而去做計算,這樣一來可以保證cpu一直在處理你的程式,當然處理時間太長也要切走 總結:單線下實現併發,是將io阻塞時間用於執行計算,可以提高效率 原理:一直使用CPU直到超時 怎麼實現單執行緒併發? 併發:指的是看起來像是同時執行,實際是在任務間來回切換,同時需要儲存執行的狀態 任務一堆程式碼 可以用函式裝起來 1.如何讓兩個函式切換執行 yield可以儲存函式的執行狀態 通過生成器可以實現偽併發 併發不一定提升效率,當任務全是計算時,反而會降低效率 2.如何知道發生了io, 從而切換執行? 第三方模組,gevent 第三方模組 greenlet 可以實現併發 但是不能檢測io 第三方模組 gevent 封裝greenlet 可以實現單執行緒併發,並且能夠檢測io操作,自動切換
協程的應用場景:
TCP 多客戶端實現方式
1.來一個客戶端就來一個程序 資源消耗較大
2.來一個客戶端就來一個執行緒 也不能無限開
3.用程序池 或 執行緒池 還是一個執行緒或程序只能維護一個連線
4.協程 一個執行緒就可以處理多個客戶端 遇到io就切到另一個
協成實現:單執行緒實現併發
1 # 這是一個程序,預設包含一個主執行緒 2 import time
#生成器函式 3 def task(): 4 while True: 5 print("task1") 6 time.sleep(1)#I/O,CPU切走 7 yield 1 8 9 def task2(): 10 g = task() 11 while True: 12 try: 13 print("task2") 14 next(g)#next()函式引數傳一個可迭代物件 15 except Exception: 16 print("任務完成") 17 break 18 task2() 19 列印結果: 20 task2 21 task1 22 task2 23 task1 24 task2 25 task1 26 ..........
greenlet模組:不能檢測I/O
# 1.例項化greenlet得到一個物件,傳入要執行的任務,至少需要兩個任務 # 2.先讓某個任務執行起來,使用物件呼叫switch # 3.在任務的執行過程中,手動呼叫switch來切換
1 import greenlet 2 import time 3 def task1(): 4 print("task1 1") 5 time.sleep(2) 6 g2.switch() 7 print("task1 2") 8 g2.switch() 9 10 def task2(): 11 print("task2 1") 12 g1.switch() 13 print("task2 2") 14 15 g1 = greenlet.greenlet(task1) 16 g2 = greenlet.greenlet(task2) 17 18 g1.switch()
gevent:可以實現單執行緒併發,並且能夠檢測io操作,自動切換
# 1.spawn函式傳入你的任務 # 2.呼叫join 去開啟任務 # 3.檢測io操作需要打monkey補丁,就是一個函式,在程式最開始的地方呼叫它
1 from gevent import monkey 2 monkey.patch_all() 3 4 import gevent 5 import time 6 def eat(): 7 print('eat food 1') 8 time.sleep(2) 9 print('eat food 2') 10 11 def play(): 12 print('play 1') 13 time.sleep(1) 14 print('play 2') 15 16 g1=gevent.spawn(eat) 17 g2=gevent.spawn(play) 18 19 gevent.joinall([g1,g2]) 20 print('主')