Tornado異步非阻塞的使用以及原理
Tornado 和現在的主流 Web 服務器框架(包括大多數 Python 的框架)有著明顯的區別:它是非阻塞式服務器,而且速度相當快。得利於其 非阻塞的方式和對 epoll 的運用,Tornado 每秒可以處理數以千計的連接,這意味著對於實時 Web 服務來說,Tornado 是一個理想的 Web 框架。
一、Tornado的兩種模式使用
1.同步阻塞模式
由於doing中sleep10秒,此時其他連接將被阻塞,必須等這次請求完成後其他請求才能連接成功。
1 import tornado.ioloop 2 import tornado.web 3 4 5 class同步阻塞MainHandler(tornado.web.RequestHandler): 6 def get(self): 7 self.doing() 8 self.write("Hello, world") 9 10 def doing(self): 11 time.sleep(10) 12 13 14 application = tornado.web.Application([ 15 (r"/index", MainHandler), 16 ]) 17 18 19 if __name__ == "__main__": 20 application.listen(8888) 21 tornado.ioloop.IOLoop.instance().start()
2.異步非阻塞模式
1、基本使用
裝飾器 + Future 從而實現Tornado的異步非阻塞:
當發送GET請求時,由於方法被@gen.coroutine裝飾且yield 一個 Future對象,那麽Tornado會等待,等待用戶向future對象中放置數據或者發送信號,如果獲取到數據或信號之後,就開始執行doing方法。異步非阻塞體現在當在Tornaod等待用戶向future對象中放置數據時,還可以處理其他請求。
註意:在等待用戶向future對象中放置數據或信號時,此連接是不斷開的。
1 class AsyncHandler(tornado.web.RequestHandler): 2 @gen.coroutine 3 def get(self): 4 future = Future() 5 tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing) 6 yield future 7 8 9 def doing(self, *args, **kwargs): 10 self.write(‘async‘) 11 self.finish()異步非阻塞
2、httpclient類庫
當服務器接到的請求需要向第三方服務器發送請求才能解決時,Tornado提供了httpclient類庫用於發送Http請求,其配合Tornado的異步非阻塞使用。
1 1 import tornado.web 2 2 from tornado import gen 3 3 from tornado import httpclient 4 4 5 5 # 方式一: 6 6 class AsyncHandler(tornado.web.RequestHandler): 7 7 @gen.coroutine 8 8 def get(self, *args, **kwargs): 9 9 print(‘進入‘) 10 10 http = httpclient.AsyncHTTPClient() 11 11 data = yield http.fetch("http://www.google.com") 12 12 print(‘完事‘,data) 13 13 self.finish(‘6666‘) 14 14 15 15 # 方式二: 16 16 # class AsyncHandler(tornado.web.RequestHandler): 17 17 # @gen.coroutine 18 18 # def get(self): 19 19 # print(‘進入‘) 20 20 # http = httpclient.AsyncHTTPClient() 21 21 # yield http.fetch("http://www.google.com", self.done) 22 22 # 23 23 # def done(self, response): 24 24 # print(‘完事‘) 25 25 # self.finish(‘666‘) 26 26 27 27 28 28 29 29 application = tornado.web.Application([ 30 30 (r"/async", AsyncHandler), 31 31 ]) 32 32 33 33 if __name__ == "__main__": 34 34 application.listen(8888) 35 35 tornado.ioloop.IOLoop.instance().start()異步發送request請求
二、Tornado異步非阻塞的原理
1、普通同步阻塞服務器框架原理
通過select與socket我們可以開發一個微型的框架,使用select實現IO多路復用監聽本地服務端socket。當有客戶端發送請求時,select監聽的本地socket發生變化,通過socket.accept()得到客戶端發送來的conn(也是一個socket),並將conn也添加到select監聽列表裏。當客戶端通過conn發送數據時,服務端select監聽列表的conn發生變化,我們將conn發送的數據(請求數據)接收保存並處理得到request_header與request_body,然後可以根據request_header中的url來匹配本地路由中的url,然後得到對應的view函數,然後將view的返回值(一般為字符串)通過conn發送回請求客戶端,然後將conn關閉,並且移除select監聽列表中的conn,這樣一次網絡IO請求便算結束。
1 import socket 2 import select 3 4 class HttpRequest(object): 5 """ 6 用戶封裝用戶請求信息 7 """ 8 def __init__(self, content): 9 """ 10 11 :param content:用戶發送的請求數據:請求頭和請求體 12 """ 13 self.content = content 14 15 self.header_bytes = bytes() 16 self.body_bytes = bytes() 17 18 self.header_dict = {} 19 20 self.method = "" 21 self.url = "" 22 self.protocol = "" 23 24 self.initialize() 25 self.initialize_headers() 26 27 def initialize(self): 28 29 temp = self.content.split(b‘\r\n\r\n‘, 1) 30 if len(temp) == 1: 31 self.header_bytes += temp 32 else: 33 h, b = temp 34 self.header_bytes += h 35 self.body_bytes += b 36 37 @property 38 def header_str(self): 39 return str(self.header_bytes, encoding=‘utf-8‘) 40 41 def initialize_headers(self): 42 headers = self.header_str.split(‘\r\n‘) 43 first_line = headers[0].split(‘ ‘) 44 if len(first_line) == 3: 45 self.method, self.url, self.protocol = headers[0].split(‘ ‘) 46 for line in headers: 47 kv = line.split(‘:‘) 48 if len(kv) == 2: 49 k, v = kv 50 self.header_dict[k] = v 51 52 # class Future(object): 53 # def __init__(self): 54 # self.result = None 55 56 def main(request): 57 return "main" 58 59 def index(request): 60 return "indexasdfasdfasdf" 61 62 63 routers = [ 64 (‘/main/‘,main), 65 (‘/index/‘,index), 66 ] 67 68 def run(): 69 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 70 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 71 sock.bind(("127.0.0.1", 9999,)) 72 sock.setblocking(False) 73 sock.listen(128) 74 75 inputs = [] 76 inputs.append(sock) 77 while True: 78 rlist,wlist,elist = select.select(inputs,[],[],0.05) 79 for r in rlist: 80 if r == sock: 81 """新請求到來""" 82 conn,addr = sock.accept() 83 conn.setblocking(False) 84 inputs.append(conn) 85 else: 86 """客戶端發來數據""" 87 data = b"" 88 while True: 89 try: 90 chunk = r.recv(1024) 91 data = data + chunk 92 except Exception as e: 93 chunk = None 94 if not chunk: 95 break 96 # data進行處理:請求頭和請求體 97 request = HttpRequest(data) 98 # 1. 請求頭中獲取url 99 # 2. 去路由中匹配,獲取指定的函數 100 # 3. 執行函數,獲取返回值 101 # 4. 將返回值 r.sendall(b‘alskdjalksdjf;asfd‘) 102 import re 103 flag = False 104 func = None 105 for route in routers: 106 if re.match(route[0],request.url): 107 flag = True 108 func = route[1] 109 break 110 if flag: 111 result = func(request) 112 r.sendall(bytes(result,encoding=‘utf-8‘)) 113 else: 114 r.sendall(b"404") 115 116 inputs.remove(r) 117 r.close() 118 119 if __name__ == ‘__main__‘: 120 run()自定義同步阻塞框架
2、Tornado異步非阻塞實現原理
tornado通過裝飾器 + Future 從而實現異步非阻塞。在view中yield一個future對象,然後再在發送相應數據前判斷view函數返回來的數據類型,如果是字符串類型直接返回,如果是future對象,則將返回來的future對象添加到async_request_dict中,先不給客戶端返回響應數據(此時可以處理其他客戶端的連接請求),等future對象的result有值時再返回,還可以設置超時時間,在規定的時間過後返回響應數據。 !! 關鍵是future對象,future對象裏有result屬性,默認為None,當result有值時再返回數據。
1 import socket 2 import select 3 import time 4 5 class HttpRequest(object): 6 """ 7 用戶封裝用戶請求信息 8 """ 9 def __init__(self, content): 10 """ 11 12 :param content:用戶發送的請求數據:請求頭和請求體 13 """ 14 self.content = content 15 16 self.header_bytes = bytes() 17 self.body_bytes = bytes() 18 19 self.header_dict = {} 20 21 self.method = "" 22 self.url = "" 23 self.protocol = "" 24 25 self.initialize() 26 self.initialize_headers() 27 28 def initialize(self): 29 30 temp = self.content.split(b‘\r\n\r\n‘, 1) 31 if len(temp) == 1: 32 self.header_bytes += temp 33 else: 34 h, b = temp 35 self.header_bytes += h 36 self.body_bytes += b 37 38 @property 39 def header_str(self): 40 return str(self.header_bytes, encoding=‘utf-8‘) 41 42 def initialize_headers(self): 43 headers = self.header_str.split(‘\r\n‘) 44 first_line = headers[0].split(‘ ‘) 45 if len(first_line) == 3: 46 self.method, self.url, self.protocol = headers[0].split(‘ ‘) 47 for line in headers: 48 kv = line.split(‘:‘) 49 if len(kv) == 2: 50 k, v = kv 51 self.header_dict[k] = v 52 53 class Future(object): 54 def __init__(self,timeout=0): 55 self.result = None 56 self.timeout = timeout 57 self.start = time.time() 58 def main(request): 59 f = Future(5) 60 return f 61 62 def index(request): 63 return "indexasdfasdfasdf" 64 65 66 routers = [ 67 (‘/main/‘,main), 68 (‘/index/‘,index), 69 ] 70 71 def run(): 72 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 73 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 74 sock.bind(("127.0.0.1", 9999,)) 75 sock.setblocking(False) 76 sock.listen(128) 77 78 inputs = [] 79 inputs.append(sock) 80 81 async_request_dict = { 82 # ‘socket‘: futrue 83 } 84 85 while True: 86 rlist,wlist,elist = select.select(inputs,[],[],0.05) 87 for r in rlist: 88 if r == sock: 89 """新請求到來""" 90 conn,addr = sock.accept() 91 conn.setblocking(False) 92 inputs.append(conn) 93 else: 94 """客戶端發來數據""" 95 data = b"" 96 while True: 97 try: 98 chunk = r.recv(1024) 99 data = data + chunk 100 except Exception as e: 101 chunk = None 102 if not chunk: 103 break 104 # data進行處理:請求頭和請求體 105 request = HttpRequest(data) 106 # 1. 請求頭中獲取url 107 # 2. 去路由中匹配,獲取指定的函數 108 # 3. 執行函數,獲取返回值 109 # 4. 將返回值 r.sendall(b‘alskdjalksdjf;asfd‘) 110 import re 111 flag = False 112 func = None 113 for route in routers: 114 if re.match(route[0],request.url): 115 flag = True 116 func = route[1] 117 break 118 if flag: 119 result = func(request) 120 if isinstance(result,Future): 121 async_request_dict[r] = result 122 else: 123 r.sendall(bytes(result,encoding=‘utf-8‘)) 124 inputs.remove(r) 125 r.close() 126 else: 127 r.sendall(b"404") 128 inputs.remove(r) 129 r.close() 130 131 for conn in async_request_dict.keys(): 132 future = async_request_dict[conn] 133 start = future.start 134 timeout = future.timeout 135 ctime = time.time() 136 if (start + timeout) <= ctime : 137 future.result = b"timeout" 138 if future.result: 139 conn.sendall(future.result) 140 conn.close() 141 del async_request_dict[conn] 142 inputs.remove(conn) 143 144 if __name__ == ‘__main__‘: 145 run()自定義異步非阻塞框架
Tornado異步非阻塞的使用以及原理