1. 程式人生 > >同步非同步與協成

同步非同步與協成

同步|非同步:

執行緒的三種狀態:
  1.就緒
  2.執行
  3.阻塞
阻塞和非阻塞描述的是執行的狀態
阻塞 :遇到了IO操作,程式碼卡住,無法執行下一行,CPU會切換到其他任務
非阻塞 :與阻塞相反,程式碼正在執行(執行狀態) 或處於就緒狀態

同步和非同步指的是提交任務的方式
同步 :提交任務必須等待任務完成,才能執行下一行
非同步 :提交任務不需要等待任務完成,立即執行下一行

程式碼:

 1 def task():
 2     for i in range(1000000):
 3         i += 1000
 4     print("11111")
 5 
 6 print("start
") 7 task() # 同步提交方式,等函式執行完菜執行下一行 8 print("end") 9 10 from threading import Thread 11 12 print("start1") 13 Thread(target=task).start() # 非同步提交,開啟執行緒,然後去執行之後的程式碼,執行緒內程式碼自行執行 14 print("end1")

非同步回撥:任務執行結束後自動呼叫某個函式

為什麼需要回調?
    子程序幫助主程序完成任務,處理任務的結果應該交還給主程序
其他方式也可以將資料交還給主程序
    1.shutdown  主程序會等到所有任務完成
    
2.result函式 會阻塞直到任務完成 都會阻塞,導致效率降低,所以使用回撥 注意: 回撥函式什麼時候被執行? 子程序任務完成時 誰在執行回撥函式? 主程序 執行緒的非同步回撥 使用方式都相同,唯一的不同是執行回撥函式,是子執行緒在執行(執行緒間資料共享)

三種方式:

 1 # 方式1 自己來儲存資料 並執行shutdown  僅在多執行緒
 2 
 3 res = []
 4 def task():
 5     print("%s is 正在打水" % os.getpid())
 6     time.sleep(0.2)
 7     w = "
%s 打的水" % os.getpid() 8 res.append(w) 9 return w 10 11 if __name__ == '__main__': 12 for i in range(20): 13 # 提交任務會返回一個物件 用於回去執行狀態和結果 14 f = pool.submit(task) 15 print(f.result()) # 方式2 執行result 它是阻塞的直到任務完成 又變成串行了 16 17 print("11111") 18 # pool.shutdown() # 首先不允許提交新任務 然後等目前所有任務完成後 19 # print(res) 20 print("over") 21 22 ==================================================================================== 23 24 pool = ThreadPoolExecutor() 25 26 # 方式3 通過回撥(什麼是回撥 任務執行結束後自動呼叫某個函式) 27 def task(): 28 print("%s is 正在打水" % os.getpid()) 29 # time.sleep(0.2) 30 w = "%s 打的水" % os.getpid() 31 return w 32 33 def task_finish(res): 34 print("打水完成! %s" % res) 35 36 if __name__ == '__main__': 37 for i in range(20): 38 # 提交任務會返回一個物件 用於回去執行狀態和結果 39 f = pool.submit(task) 40 f.add_done_callback(task_finish) #新增完成後的回撥 41 print("11111") 42 print("over")

利用回撥完成生產者消費者:

多程序:
1 from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 from threading import current_thread
 3 import  os
 4 # 程序池
 5 pool = ProcessPoolExecutor()
 6 # 爬蟲:從網路某個地址獲取一個HTML檔案
 7 import requests # 該模組用於網路(HTTP)請求
 8 
 9 # 生產資料,即生產者
10 def get_data_task(url):
11     print(os.getpid(),"正在生產資料!")
12     # print(current_thread(),"正在生產資料!")
13 
14     response = requests.get(url)
15     text = response.content.decode("utf-8")
16     print(text)
17     return text
18 
19 # 處理資料,即消費者
20 def parser_data(f):
21     print(os.getpid(),"處理資料")
22     # print(current_thread(), "處理資料")
23     print("正在解析: 長度%s" % len(f.result()))
24 
25 urls = [
26     "http://www.baidu.com",
27     "http://www.baidu.com",
28     "http://www.baidu.com",
29     "http://www.baidu.com"
30 ]
31 
32 if __name__ == '__main__':
33     for url in urls:
34         f = pool.submit(get_data_task,url)
35         f.add_done_callback(parser_data)  # 回撥函式是主程序在執行
36         # 因為子程序是負責獲取資料的,然而資料怎麼處理 ,子程序並不知道.應該把資料還給主程序
37     print("over")
多執行緒:
 1 from  concurrent.futures import ThreadPoolExecutor
 2 from threading import current_thread
 3 # 程序池
 4 pool = ThreadPoolExecutor()
 5 
 6 # 爬蟲:從網路某個地址獲取一個HTML檔案
 7 import requests # 該模組用於網路(HTTP)請求
 8 
 9 # 生產資料
10 def get_data_task(url):
11     # print(os.getpid(),"正在生產資料!")
12     print(current_thread(),"正在生產資料!")
13 
14     response = requests.get(url)
15     text = response.content.decode("utf-8")
16     print(text)
17     return text
18 
19 #   處理資料
20 def parser_data(f):
21     # print(os.getpid(),"處理資料")
22     print(current_thread(), "處理資料")
23     print("正在解析: 長度%s" % len(f.result()))
24 
25 urls = [
26     "http://www.baidu.com",
27     "http://www.baidu.com",
28     "http://www.baidu.com",
29     "http://www.baidu.com"
30 ]
31 
32 if __name__ == '__main__':
33     for url in urls:
34         f = pool.submit(get_data_task,url)
35         f.add_done_callback(parser_data)  # 回撥函式是主程序在執行
36         # 因為子程序是負責獲取資料的  然而資料怎麼處理 子程序並不知道  應該把資料還給主程序
37     print("over")

 執行緒佇列:

普通佇列/堆疊佇列/優先順序佇列:

import queue

# 普通佇列 先進先出
q = queue.Queue()
q.put("a")
q.put("b")

print(q.get())
print(q.get())

# 堆疊佇列  先進後出 後進先出  函式呼叫就是進棧  函式結束就出棧 遞迴造成棧溢位
q2 = queue.LifoQueue()
q2.put("a")
q2.put("b")
print(q2.get())

# 優先順序佇列
q3 = queue.PriorityQueue()  # 數值越小優先順序越高  優先順序相同時 比較大小 小的先取
q3.put((-100, "c"))
q3.put((1, "a"))
q3.put((100, "b"))
print(q3.get())

協成:在單執行緒實現併發

協程的目的是在單執行緒下實現併發
為什麼出現協程? 因為cpython中,由於GIL而導致同一時間只有一個執行緒在跑
意味著:如果你的程式時計算密集,多執行緒效率也不會提升
      如果是io密集型 沒有必要在單執行緒下實現併發,我會開啟多執行緒來處理io,子線遇到io,cpu切走.
      不能保證一定切到主線
      如果可以,我在遇到io的時候轉而去做計算,這樣一來可以保證cpu一直在處理你的程式,當然處理時間太長也要切走

 總結:單線下實現併發,是將io阻塞時間用於執行計算,可以提高效率
 原理:一直使用CPU直到超時
怎麼實現單執行緒併發?
    併發:指的是看起來像是同時執行,實際是在任務間來回切換,同時需要儲存執行的狀態
    任務一堆程式碼  可以用函式裝起來
    1.如何讓兩個函式切換執行
        yield可以儲存函式的執行狀態
        通過生成器可以實現偽併發
        併發不一定提升效率,當任務全是計算時,反而會降低效率
    2.如何知道發生了io, 從而切換執行?
        第三方模組,gevent
第三方模組 greenlet 可以實現併發 但是不能檢測io
第三方模組 gevent 封裝greenlet 可以實現單執行緒併發,並且能夠檢測io操作,自動切換
協程的應用場景:
TCP 多客戶端實現方式
1.來一個客戶端就來一個程序 資源消耗較大
2.來一個客戶端就來一個執行緒 也不能無限開
3.用程序池 或 執行緒池 還是一個執行緒或程序只能維護一個連線
4.協程 一個執行緒就可以處理多個客戶端 遇到io就切到另一個

協成實現:單執行緒實現併發

 1 # 這是一個程序,預設包含一個主執行緒
 2 import time
  #生成器函式
3 def task(): 4 while True: 5 print("task1") 6 time.sleep(1)#I/O,CPU切走 7 yield 1 8 9 def task2(): 10 g = task() 11 while True: 12 try: 13 print("task2") 14 next(g)#next()函式引數傳一個可迭代物件 15 except Exception: 16 print("任務完成") 17 break 18 task2() 19 列印結果: 20 task2 21 task1 22 task2 23 task1 24 task2 25 task1 26 ..........

greenlet模組:不能檢測I/O

# 1.例項化greenlet得到一個物件,傳入要執行的任務,至少需要兩個任務
# 2.先讓某個任務執行起來,使用物件呼叫switch
# 3.在任務的執行過程中,手動呼叫switch來切換
 1 import greenlet
 2 import time
 3 def task1():
 4     print("task1 1")
 5     time.sleep(2)
 6     g2.switch()
 7     print("task1 2")
 8     g2.switch()
 9 
10 def task2():
11     print("task2 1")
12     g1.switch()
13     print("task2 2")
14 
15 g1 = greenlet.greenlet(task1)
16 g2 = greenlet.greenlet(task2)
17 
18 g1.switch()

gevent:可以實現單執行緒併發,並且能夠檢測io操作,自動切換

# 1.spawn函式傳入你的任務
# 2.呼叫join 去開啟任務
# 3.檢測io操作需要打monkey補丁,就是一個函式,在程式最開始的地方呼叫它
 1 from gevent import monkey
 2 monkey.patch_all()
 3 
 4 import gevent
 5 import time
 6 def eat():
 7     print('eat food 1')
 8     time.sleep(2)
 9     print('eat food 2')
10 
11 def play():
12     print('play 1')
13     time.sleep(1)
14     print('play 2')
15 
16 g1=gevent.spawn(eat)
17 g2=gevent.spawn(play)
18 
19 gevent.joinall([g1,g2])
20 print('')