小白對非同步IO的理解
前言
看到越來越多的大佬都在使用python的非同步IO,協程等概念來實現高效的IO處理過程,可是我對這些概念還不太懂,就學習了一下。 因為是初學者,在理解上有很多不到位的地方,如果有錯誤,還希望能夠有人積極幫我指出。
下面就使用一個簡單的爬蟲的例子,通過一步一步的改進,最後來用非同步IO的方式實現。
1. 阻塞的IO
我們要實現一個爬蟲,去爬百度首頁n次,最簡單的想法就是依次下載,從建立socket連線到傳送網路請求再到讀取響應資料,順序進行。
程式碼如下:
1 import time 2 import socket 3 import sys 4 5 defdoRequest(): 6 sock = socket.socket() 7 sock.connect(('www.baidu.com',80)) 8 sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8")) 9 response = sock.recv(1024) 10 return response 11 12 def main(): 13 start = time.time() 14 for i inrange(int(sys.argv[1])): 15 doRequest() 16 print("spend time : %s" %(time.time()-start)) 17 18 main()
因為socket是阻塞方式呼叫的,所以cpu執行到sock.connect()
,sock.recv()
,就會一直卡在那裡直到socket的狀態就緒,所以浪費了很多的CPU時間。
請求10次和20次的時間分別如下所示:
1 ➜ python3 1.py 10 2 spend time : 0.9282660484313965 3 ➜ python3 1.py 20 4spend time : 1.732438087463379
可以看到,速度慢的跟蝸牛一樣。
2. 改進1-併發
為了加快請求的速度,很容易想到我們可以使用併發的方式進行,那麼最好的方式就是使用多執行緒了。修改後的程式碼如下:
1 # 多執行緒 2 3 import time 4 import socket 5 import sys 6 import threading 7 8 def doRequest(): 9 sock = socket.socket() 10 sock.connect(('www.baidu.com',80)) 11 sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8")) 12 response = sock.recv(1024) 13 return response 14 15 def main(): 16 start = time.time() 17 threads = [] 18 for i in range(int(sys.argv[1])): 19 # doRequest() 20 threads.append(threading.Thread(target=doRequest,args=())) 21 for i in threads: 22 i.start() 23 for i in threads: 24 i.join() 25 print("spend time : %s" %(time.time()-start))
使用執行緒之後,看一下請求10次和20次的時間:
1 ➜ python3 2.py 10 2 spend time : 0.1124269962310791 3 ➜ python3 2.py 20 4 spend time : 0.15438294410705566
速度明顯快了很多,幾乎是剛才的10倍了。
但是python的執行緒是有問題的,因為一個python程序中,同一時刻只允許一個執行緒執行,正在執行的執行緒會獲取到GPL。做阻塞的系統呼叫時,例如sock.connect()
,sock.recv()
時,當前執行緒會釋放GIL,讓別的執行緒有機會獲取GPL,然後執行。但是這種獲取GPL的排程策略是搶佔式的,以保證同等優先順序的執行緒都有均等的執行機會,那帶來的問題是:並不知道下一時刻是哪個執行緒被執行,也不知道它正要執行的程式碼是什麼。所以就可能存在競態條件。這種競爭有可能使某些執行緒處於劣勢,導致一直獲取不到GPL
比如如下的情況,執行緒1執行的程式碼如下:
1 flag = True 2 while flag: 3 ..... # 這裡省略一些複雜的操作,會呼叫多次IO操作 4 time.sleep(1)
可以看到,執行緒1的任務非常簡單,而執行緒2的任務非常複雜,這就會導致CPU不停地去執行執行緒1,而真正做實際工作的執行緒2卻很少被排程到,導致了浪費了大量的CPU資源。
3. 改進2-非阻塞方式
在第一個例子中,我們意識到浪費了大量的時間,是因為我們用了阻塞的IO,導致CPU在卡在那裡等待IO的就緒,那使用非阻塞的IO,是不是就可以解決這個問題了。
程式碼如下:
1 import time 2 import socket 3 import sys 4 5 def doRequest(): 6 sock = socket.socket() 7 sock.setblocking(False) 8 try: 9 sock.connect(('www.baidu.com',80)) 10 except BlockingIOError: 11 pass 12 13 # 因為設定為非阻塞模式了,不知道何時socket就緒,需要不停的監控socket的狀態 14 while True: 15 try: 16 sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8")) 17 # 直到send 不丟擲異常,就傳送成功了 18 break 19 except OSError: 20 pass 21 while True: 22 try: 23 response = sock.recv(1024) 24 break 25 except OSError: 26 pass 27 return response 28 def main(): 29 start = time.time() 30 for i in range(int(sys.argv[1])): 31 doRequest() 32 print("spend time : %s" %(time.time()-start)) 33 34 main()
sock.setblocking(False)
把socket設定為非阻塞式的,也就是說執行完sock.connect()
和sock.recv()
之後,CPU不再等待IO了,會繼續往下執行,來看一下執行時間:
1 ➜ python3 3.py 10 2 spend time : 1.0597507953643799 3 ➜ python3 3.py 20 4 spend time : 2.0327329635620117
感覺被騙了,速度還是跟第一個一樣啊,看來非阻塞IO並沒有什麼速度上的提升啊,問題出在哪裡呢?看程式碼發現多了兩個while迴圈:
1 while True: 2 try: 3 sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8")) 4 # 直到send 不丟擲異常,就傳送成功了 5 break 6 except OSError: 7 pass 8 while True: 9 try: 10 response = sock.recv(1024) 11 break 12 except OSError: 13 pass
因為把socket設定為非阻塞模式了,所以CPU並不知道IO什麼時候就緒,所以必須在這裡不停的嘗試,直到IO可以使用了為止。
雖然 connect() 和 recv() 不再阻塞主程式,空出來的時間段CPU沒有空閒著,但並沒有利用好這空閒去做其他有意義的事情,而是在迴圈嘗試讀寫 socket (不停判斷非阻塞呼叫的狀態是否就緒)。
有沒有辦法讓CPU空閒出來的時間,不用來不停的詢問IO,而是幹別的更有意義的事情呢,等IO就緒後再通知CPU回來處理呢?當然有了,那就是回撥。
4. 改進3-回撥
作業系統已經把IO狀態的改變封裝成了事件,如可讀事件、可寫事件。並且可以為這些事件繫結處理函式。所以我們可以使用這種方式,為socket的IO狀態的變化繫結處理函式,交給系統進行調動,這樣就是回撥方式。python的select模組支援這樣的操作。
程式碼如下:
1 import socket 2 from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ 3 import sys 4 selector = DefaultSelector() 5 stopped = False 6 urls_todo = [] 7 8 class Crawler: 9 def __init__(self, url): 10 self.url = url 11 self.sock = None 12 self.response = b'' 13 14 def fetch(self): 15 self.sock = socket.socket() 16 self.sock.setblocking(False) 17 try: 18 self.sock.connect(('www.baidu.com', 80)) 19 except BlockingIOError: 20 pass 21 selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) 22 23 def connected(self, key, mask): 24 selector.unregister(key.fd) 25 get = 'GET {0} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format(self.url) 26 self.sock.send(get.encode('ascii')) 27 selector.register(key.fd, EVENT_READ, self.read_response) 28 29 def read_response(self, key, mask): 30 global stopped 31 # 如果響應大於4KB,下一次迴圈會繼續讀 32 chunk = self.sock.recv(4096) 33 if chunk: 34 self.response += chunk 35 else: 36 selector.unregister(key.fd) 37 urls_todo.remove(self.url) 38 if not urls_todo: 39 stopped = True 40 41 def loop(): 42 while not stopped: 43 # 阻塞, 直到一個事件發生 44 events = selector.select() 45 for event_key, event_mask in events: 46 callback = event_key.data 47 callback(event_key, event_mask) 48 49 if __name__ == '__main__': 50 import time 51 start = time.time() 52 for i in range(int(sys.argv[1])): 53 urls_todo.append("/"+str(i)) 54 crawler = Crawler("/"+str(i)) 55 crawler.fetch() 56 loop() 57 print("spend time : %s" %(time.time()-start))
監控socket的狀態,如果變為可寫的,就往裡面寫資料
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
監控socket的狀態,如果變為可讀的,就外讀資料
selector.register(key.fd, EVENT_READ, self.read_response)
測試一下速度:
1 ➜ python3 4.py 10 2 spend time : 0.03910994529724121 3 ➜ python3 4.py 20 4 spend time : 0.04195284843444824
我們看到速度已經有個一個質的飛躍了,但是回撥用一些嚴重的問題,會破壞程式碼的本來的邏輯結構,造成程式碼可讀性很差。
比如我們有函式 funcA,funcB,funcC三個函式,如果funcC處理的結果依賴於funcB的處理結果,funcB的處理結果依賴於funcA的處理結果,而funcA又是回撥的方式呼叫的,所以就不知道funcA什麼時候返回,所以只能將後續的處理都作為callback的方式傳入funcA中,讓funcA執行完了可以執行funcB,funcB執行完了可以執行funcC,看起來像下面這樣:
funcA(funcB(funcC()))
這就形成了一個鏈式的回撥,跟最初的程式碼邏輯完全相反,本來的邏輯應該是這樣的。
funcC(funcB(funcA()))
因為這樣的鏈式回撥的出現,導致了理解程式碼邏輯困難,並且錯誤處理困難。
有沒有方法避免這種地獄式的鏈式回撥的呢?
5 .改進4-利用生成器
可以利用python的生成器,把發請求的函式寫成一個生成器,然後只監控IO的狀態,當IO狀態發生改變之後,就給生成器傳送值,驅動生成器進行下一步操作,這樣就可以避免回調了,具體實現如下:
1 import select 2 import socket 3 import time 4 import sys 5 6 num = int(sys.argv[1]) 7 8 def coroutine(): 9 sock = socket.socket() 10 sock.setblocking(0) 11 address = yield sock 12 try: 13 sock.connect(address) 14 except BlockingIOError: 15 pass 16 data = yield 17 size = yield sock.send(data) 18 yield sock.recv(size) 19 20 def main(): 21 inputs = [] 22 outputs = [] 23 coros = [] 24 coro_dict = dict() 25 for i in range(num): 26 coros.append(coroutine()) 27 sock = coros[-1].send(None) # 傳送一個None值來啟動生成器 28 outputs.append(sock.fileno()) # 29 # print(outputs) 30 coro_dict[sock.fileno()] = coros[-1] 31 coros[-1].send(('www.baidu.com', 80)) 32 while True: 33 r_list,w_list,e_list = select.select(inputs,outputs, ()) 34 for i in w_list: 35 # print(type(i)) 36 coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n') 37 outputs.remove(i) 38 inputs.append(i) 39 for i in r_list: 40 coro_dict[i].send(1024) 41 inputs.remove(i) 42 if len(inputs) == len(outputs) == 0: 43 break 44 # time.sleep(2) 45 # coro.send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n') 46 # select.select(wait_list, (), ()) 47 # print(coro.send(1024)) 48 49 start = time.time() 50 main() 51 print("spend time : %s" %(time.time()-start))
可以看到把發起請求的函式寫成了一個生成器:
1 def coroutine(): 2 sock = socket.socket() 3 sock.setblocking(0) 4 address = yield sock 5 try: 6 sock.connect(address) 7 except BlockingIOError: 8 pass 9 data = yield 10 size = yield sock.send(data) 11 yield sock.recv(size)
然後監控IO狀態,當IO狀態發生改變之後,驅動生成器繼續執行。
1 while True: 2 r_list,w_list,e_list = select.select(inputs,outputs, ()) 3 for i in w_list: 4 # print(type(i)) 5 coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n') 6 outputs.remove(i) 7 inputs.append(i) 8 for i in r_list: 9 coro_dict[i].send(1024) 10 inputs.remove(i)
看一下程式執行時間:
1 ➜ python3 5.py 10 2 spend time : 0.058114051818847656 3 ➜ python3 5.py 20 4 spend time : 0.0949699878692627
效果貌似非常的棒啊,執行的太快了,但是當我執行300次請求的時候,我就發現問題了,返回的非常慢,。估計原因可能是select是順序遍歷每一個IO描述符的去做狀態檢查,當IO描述符過多的時候,會導致遍歷的速度比較慢,所以造成時間花費很大。