1. 程式人生 > >小白對非同步IO的理解

小白對非同步IO的理解

前言

看到越來越多的大佬都在使用python的非同步IO,協程等概念來實現高效的IO處理過程,可是我對這些概念還不太懂,就學習了一下。 因為是初學者,在理解上有很多不到位的地方,如果有錯誤,還希望能夠有人積極幫我指出。

下面就使用一個簡單的爬蟲的例子,通過一步一步的改進,最後來用非同步IO的方式實現。

1. 阻塞的IO

我們要實現一個爬蟲,去爬百度首頁n次,最簡單的想法就是依次下載,從建立socket連線到傳送網路請求再到讀取響應資料,順序進行。

程式碼如下:

 1 import time 
 2 import socket
 3 import sys
 4  
 5 def
doRequest(): 6 sock = socket.socket() 7 sock.connect(('www.baidu.com',80)) 8 sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8")) 9 response = sock.recv(1024) 10 return response 11 12 def main(): 13 start = time.time() 14 for i in
range(int(sys.argv[1])): 15 doRequest() 16 print("spend time : %s" %(time.time()-start)) 17 18 main()

因為socket是阻塞方式呼叫的,所以cpu執行到sock.connect(),sock.recv(),就會一直卡在那裡直到socket的狀態就緒,所以浪費了很多的CPU時間。

 

請求10次和20次的時間分別如下所示:

1 ➜ python3 1.py  10
2 spend time : 0.9282660484313965
3 ➜ python3 1.py  20
4
spend time : 1.732438087463379

可以看到,速度慢的跟蝸牛一樣。

2. 改進1-併發

為了加快請求的速度,很容易想到我們可以使用併發的方式進行,那麼最好的方式就是使用多執行緒了。修改後的程式碼如下:

 1 # 多執行緒
 2  
 3 import time 
 4 import socket
 5 import sys
 6 import threading 
 7  
 8 def doRequest():
 9     sock = socket.socket()
10     sock.connect(('www.baidu.com',80))
11     sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
12     response = sock.recv(1024)
13     return response
14  
15 def main():
16     start = time.time()
17     threads = []
18     for i in range(int(sys.argv[1])):
19         # doRequest()
20         threads.append(threading.Thread(target=doRequest,args=()))
21     for i in threads:
22         i.start()
23     for i in threads:
24         i.join()
25     print("spend time : %s" %(time.time()-start))

使用執行緒之後,看一下請求10次和20次的時間:

1 ➜  python3 2.py  10
2 spend time : 0.1124269962310791
3 ➜ python3 2.py  20
4 spend time : 0.15438294410705566

速度明顯快了很多,幾乎是剛才的10倍了。

但是python的執行緒是有問題的,因為一個python程序中,同一時刻只允許一個執行緒執行,正在執行的執行緒會獲取到GPL。做阻塞的系統呼叫時,例如sock.connect(),sock.recv()時,當前執行緒會釋放GIL,讓別的執行緒有機會獲取GPL,然後執行。但是這種獲取GPL的排程策略是搶佔式的,以保證同等優先順序的執行緒都有均等的執行機會,那帶來的問題是:並不知道下一時刻是哪個執行緒被執行,也不知道它正要執行的程式碼是什麼。所以就可能存在競態條件。這種競爭有可能使某些執行緒處於劣勢,導致一直獲取不到GPL

比如如下的情況,執行緒1執行的程式碼如下:

1 flag = True
2 while flag:
3     .....  # 這裡省略一些複雜的操作,會呼叫多次IO操作
4     time.sleep(1)

可以看到,執行緒1的任務非常簡單,而執行緒2的任務非常複雜,這就會導致CPU不停地去執行執行緒1,而真正做實際工作的執行緒2卻很少被排程到,導致了浪費了大量的CPU資源。

3. 改進2-非阻塞方式

在第一個例子中,我們意識到浪費了大量的時間,是因為我們用了阻塞的IO,導致CPU在卡在那裡等待IO的就緒,那使用非阻塞的IO,是不是就可以解決這個問題了。

程式碼如下:

 1 import time 
 2 import socket
 3 import sys
 4  
 5 def doRequest():
 6     sock = socket.socket()
 7     sock.setblocking(False)
 8     try:
 9         sock.connect(('www.baidu.com',80))
10     except BlockingIOError:
11         pass   
12  
13     # 因為設定為非阻塞模式了,不知道何時socket就緒,需要不停的監控socket的狀態
14     while True:
15         try:
16             sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
17             # 直到send 不丟擲異常,就傳送成功了 
18             break
19         except OSError:
20             pass
21     while True:
22         try:
23             response = sock.recv(1024)
24             break
25         except OSError:
26             pass
27     return response
28 def main():
29     start = time.time()
30     for i in range(int(sys.argv[1])):
31         doRequest()
32     print("spend time : %s" %(time.time()-start))
33  
34 main()

sock.setblocking(False)把socket設定為非阻塞式的,也就是說執行完sock.connect()sock.recv()之後,CPU不再等待IO了,會繼續往下執行,來看一下執行時間:

1 ➜  python3 3.py  10
2 spend time : 1.0597507953643799
3 ➜  python3 3.py  20
4 spend time : 2.0327329635620117

感覺被騙了,速度還是跟第一個一樣啊,看來非阻塞IO並沒有什麼速度上的提升啊,問題出在哪裡呢?看程式碼發現多了兩個while迴圈:

 1 while True:
 2     try:
 3         sock.send("GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n".encode("utf-8"))
 4         # 直到send 不丟擲異常,就傳送成功了 
 5         break
 6     except OSError:
 7         pass
 8 while True:
 9     try:
10         response = sock.recv(1024)
11         break
12     except OSError:
13         pass

因為把socket設定為非阻塞模式了,所以CPU並不知道IO什麼時候就緒,所以必須在這裡不停的嘗試,直到IO可以使用了為止。

雖然 connect() 和 recv() 不再阻塞主程式,空出來的時間段CPU沒有空閒著,但並沒有利用好這空閒去做其他有意義的事情,而是在迴圈嘗試讀寫 socket (不停判斷非阻塞呼叫的狀態是否就緒)。

有沒有辦法讓CPU空閒出來的時間,不用來不停的詢問IO,而是幹別的更有意義的事情呢,等IO就緒後再通知CPU回來處理呢?當然有了,那就是回撥。

4. 改進3-回撥

作業系統已經把IO狀態的改變封裝成了事件,如可讀事件、可寫事件。並且可以為這些事件繫結處理函式。所以我們可以使用這種方式,為socket的IO狀態的變化繫結處理函式,交給系統進行調動,這樣就是回撥方式。python的select模組支援這樣的操作。

程式碼如下:

 1 import socket
 2 from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
 3 import sys
 4 selector = DefaultSelector()
 5 stopped = False
 6 urls_todo = []
 7  
 8 class Crawler:
 9     def __init__(self, url):
10         self.url = url
11         self.sock = None
12         self.response = b''
13  
14     def fetch(self):
15         self.sock = socket.socket()
16         self.sock.setblocking(False)
17         try:
18             self.sock.connect(('www.baidu.com', 80))
19         except BlockingIOError:
20             pass
21         selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
22  
23     def connected(self, key, mask):
24         selector.unregister(key.fd)
25         get = 'GET {0} HTTP/1.0\r\nHost: www.baidu.com\r\n\r\n'.format(self.url)
26         self.sock.send(get.encode('ascii'))
27         selector.register(key.fd, EVENT_READ, self.read_response)
28  
29     def read_response(self, key, mask):
30         global stopped
31         # 如果響應大於4KB,下一次迴圈會繼續讀
32         chunk = self.sock.recv(4096)
33         if chunk:
34             self.response += chunk
35         else:
36             selector.unregister(key.fd)
37             urls_todo.remove(self.url)
38             if not urls_todo:
39                 stopped = True
40  
41 def loop():
42     while not stopped:
43         # 阻塞, 直到一個事件發生
44         events = selector.select()
45         for event_key, event_mask in events:
46             callback = event_key.data
47             callback(event_key, event_mask)
48  
49 if __name__ == '__main__':
50     import time
51     start = time.time()
52     for i in range(int(sys.argv[1])):
53         urls_todo.append("/"+str(i))
54         crawler = Crawler("/"+str(i))
55         crawler.fetch()
56     loop()
57     print("spend time : %s" %(time.time()-start))  

監控socket的狀態,如果變為可寫的,就往裡面寫資料

selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

監控socket的狀態,如果變為可讀的,就外讀資料

selector.register(key.fd, EVENT_READ, self.read_response)

測試一下速度:

1 ➜  python3 4.py 10
2 spend time : 0.03910994529724121
3 ➜  python3 4.py 20
4 spend time : 0.04195284843444824

我們看到速度已經有個一個質的飛躍了,但是回撥用一些嚴重的問題,會破壞程式碼的本來的邏輯結構,造成程式碼可讀性很差。

比如我們有函式 funcA,funcB,funcC三個函式,如果funcC處理的結果依賴於funcB的處理結果,funcB的處理結果依賴於funcA的處理結果,而funcA又是回撥的方式呼叫的,所以就不知道funcA什麼時候返回,所以只能將後續的處理都作為callback的方式傳入funcA中,讓funcA執行完了可以執行funcB,funcB執行完了可以執行funcC,看起來像下面這樣:

funcA(funcB(funcC()))

這就形成了一個鏈式的回撥,跟最初的程式碼邏輯完全相反,本來的邏輯應該是這樣的。

funcC(funcB(funcA()))

因為這樣的鏈式回撥的出現,導致了理解程式碼邏輯困難,並且錯誤處理困難。

有沒有方法避免這種地獄式的鏈式回撥的呢?

5 .改進4-利用生成器

可以利用python的生成器,把發請求的函式寫成一個生成器,然後只監控IO的狀態,當IO狀態發生改變之後,就給生成器傳送值,驅動生成器進行下一步操作,這樣就可以避免回調了,具體實現如下:

 1 import select
 2 import socket
 3 import time
 4 import sys
 5  
 6 num = int(sys.argv[1])
 7  
 8 def coroutine():
 9     sock = socket.socket()
10     sock.setblocking(0)
11     address = yield sock
12     try:
13         sock.connect(address)
14     except BlockingIOError:
15         pass
16     data = yield
17     size = yield sock.send(data)
18     yield sock.recv(size)
19  
20 def main():
21     inputs = []
22     outputs = []
23     coros = []
24     coro_dict = dict()
25     for i in range(num):
26         coros.append(coroutine())
27         sock = coros[-1].send(None)   # 傳送一個None值來啟動生成器
28         outputs.append(sock.fileno()) # 
29         # print(outputs)
30         coro_dict[sock.fileno()] = coros[-1]
31         coros[-1].send(('www.baidu.com', 80))
32     while True:
33         r_list,w_list,e_list = select.select(inputs,outputs, ())
34         for i in w_list:
35             # print(type(i))
36             coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
37             outputs.remove(i)
38             inputs.append(i)
39         for i in r_list:
40             coro_dict[i].send(1024)
41             inputs.remove(i)
42         if len(inputs) == len(outputs) == 0:
43             break  
44     # time.sleep(2)
45     # coro.send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
46     # select.select(wait_list, (), ())
47     # print(coro.send(1024))
48  
49 start  = time.time()
50 main()
51 print("spend time : %s" %(time.time()-start))

可以看到把發起請求的函式寫成了一個生成器:

 1 def coroutine():
 2     sock = socket.socket()
 3     sock.setblocking(0)
 4     address = yield sock
 5     try:
 6         sock.connect(address)
 7     except BlockingIOError:
 8         pass
 9     data = yield
10     size = yield sock.send(data)
11     yield sock.recv(size)

然後監控IO狀態,當IO狀態發生改變之後,驅動生成器繼續執行。

 1 while True:
 2         r_list,w_list,e_list = select.select(inputs,outputs, ())
 3         for i in w_list:
 4             # print(type(i))
 5             coro_dict[i].send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
 6             outputs.remove(i)
 7             inputs.append(i)
 8         for i in r_list:
 9             coro_dict[i].send(1024)
10             inputs.remove(i)

看一下程式執行時間:

1 ➜  python3 5.py 10
2 spend time : 0.058114051818847656
3 ➜  python3 5.py 20
4 spend time : 0.0949699878692627

效果貌似非常的棒啊,執行的太快了,但是當我執行300次請求的時候,我就發現問題了,返回的非常慢,。估計原因可能是select是順序遍歷每一個IO描述符的去做狀態檢查,當IO描述符過多的時候,會導致遍歷的速度比較慢,所以造成時間花費很大。