1. 程式人生 > >Tornado異步非阻塞的使用以及原理

Tornado異步非阻塞的使用以及原理

發送數據 star opened prope asd 等待 page 設置 使用

Tornado 和現在的主流 Web 服務器框架(包括大多數 Python 的框架)有著明顯的區別:它是非阻塞式服務器,而且速度相當快。得利於其 非阻塞的方式和對 epoll 的運用,Tornado 每秒可以處理數以千計的連接,這意味著對於實時 Web 服務來說,Tornado 是一個理想的 Web 框架。

一、Tornado的兩種模式使用

1.同步阻塞模式

由於doing中sleep10秒,此時其他連接將被阻塞,必須等這次請求完成後其他請求才能連接成功。

技術分享圖片
1 import tornado.ioloop
 2 import tornado.web
 3    
 4    
 5 class
MainHandler(tornado.web.RequestHandler): 6 def get(self): 7 self.doing() 8 self.write("Hello, world") 9 10 def doing(self): 11 time.sleep(10) 12 13 14 application = tornado.web.Application([ 15 (r"/index", MainHandler), 16 ]) 17 18 19 if __name__ == "
__main__": 20 application.listen(8888) 21 tornado.ioloop.IOLoop.instance().start()
同步阻塞

2.異步非阻塞模式

1、基本使用

裝飾器 + Future 從而實現Tornado的異步非阻塞:

當發送GET請求時,由於方法被@gen.coroutine裝飾且yield 一個 Future對象,那麽Tornado會等待,等待用戶向future對象中放置數據或者發送信號,如果獲取到數據或信號之後,就開始執行doing方法。異步非阻塞體現在當在Tornaod等待用戶向future對象中放置數據時,還可以處理其他請求。

註意:在等待用戶向future對象中放置數據或信號時,此連接是不斷開的。

技術分享圖片
 1 class AsyncHandler(tornado.web.RequestHandler):
 2     @gen.coroutine
 3     def get(self):
 4         future = Future()
 5         tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.doing)
 6         yield future
 7 
 8 
 9     def doing(self, *args, **kwargs):
10         self.write(async)
11         self.finish()
異步非阻塞

2、httpclient類庫

當服務器接到的請求需要向第三方服務器發送請求才能解決時,Tornado提供了httpclient類庫用於發送Http請求,其配合Tornado的異步非阻塞使用。

技術分享圖片
 1 1 import tornado.web
 2  2 from tornado import gen
 3  3 from tornado import httpclient
 4  4  
 5  5 # 方式一:
 6  6 class AsyncHandler(tornado.web.RequestHandler):
 7  7     @gen.coroutine
 8  8     def get(self, *args, **kwargs):
 9  9         print(進入)
10 10         http = httpclient.AsyncHTTPClient()
11 11         data = yield http.fetch("http://www.google.com")
12 12         print(完事,data)
13 13         self.finish(6666)
14 14  
15 15 # 方式二:
16 16 # class AsyncHandler(tornado.web.RequestHandler):
17 17 #     @gen.coroutine
18 18 #     def get(self):
19 19 #         print(‘進入‘)
20 20 #         http = httpclient.AsyncHTTPClient()
21 21 #         yield http.fetch("http://www.google.com", self.done)
22 22 #
23 23 #     def done(self, response):
24 24 #         print(‘完事‘)
25 25 #         self.finish(‘666‘)
26 26  
27 27  
28 28  
29 29 application = tornado.web.Application([
30 30     (r"/async", AsyncHandler),
31 31 ])
32 32  
33 33 if __name__ == "__main__":
34 34     application.listen(8888)
35 35     tornado.ioloop.IOLoop.instance().start() 
異步發送request請求

二、Tornado異步非阻塞的原理

1、普通同步阻塞服務器框架原理

通過select與socket我們可以開發一個微型的框架,使用select實現IO多路復用監聽本地服務端socket。當有客戶端發送請求時,select監聽的本地socket發生變化,通過socket.accept()得到客戶端發送來的conn(也是一個socket),並將conn也添加到select監聽列表裏。當客戶端通過conn發送數據時,服務端select監聽列表的conn發生變化,我們將conn發送的數據(請求數據)接收保存並處理得到request_header與request_body,然後可以根據request_header中的url來匹配本地路由中的url,然後得到對應的view函數,然後將view的返回值(一般為字符串)通過conn發送回請求客戶端,然後將conn關閉,並且移除select監聽列表中的conn,這樣一次網絡IO請求便算結束。

技術分享圖片
  1 import socket
  2 import select
  3 
  4 class HttpRequest(object):
  5     """
  6     用戶封裝用戶請求信息
  7     """
  8     def __init__(self, content):
  9         """
 10 
 11         :param content:用戶發送的請求數據:請求頭和請求體
 12         """
 13         self.content = content
 14 
 15         self.header_bytes = bytes()
 16         self.body_bytes = bytes()
 17 
 18         self.header_dict = {}
 19 
 20         self.method = ""
 21         self.url = ""
 22         self.protocol = ""
 23 
 24         self.initialize()
 25         self.initialize_headers()
 26 
 27     def initialize(self):
 28 
 29         temp = self.content.split(b\r\n\r\n, 1)
 30         if len(temp) == 1:
 31             self.header_bytes += temp
 32         else:
 33             h, b = temp
 34             self.header_bytes += h
 35             self.body_bytes += b
 36 
 37     @property
 38     def header_str(self):
 39         return str(self.header_bytes, encoding=utf-8)
 40 
 41     def initialize_headers(self):
 42         headers = self.header_str.split(\r\n)
 43         first_line = headers[0].split( )
 44         if len(first_line) == 3:
 45             self.method, self.url, self.protocol = headers[0].split( )
 46             for line in headers:
 47                 kv = line.split(:)
 48                 if len(kv) == 2:
 49                     k, v = kv
 50                     self.header_dict[k] = v
 51 
 52 # class Future(object):
 53 #     def __init__(self):
 54 #         self.result = None
 55 
 56 def main(request):
 57     return "main"
 58 
 59 def index(request):
 60     return "indexasdfasdfasdf"
 61 
 62 
 63 routers = [
 64     (/main/,main),
 65     (/index/,index),
 66 ]
 67 
 68 def run():
 69     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 70     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 71     sock.bind(("127.0.0.1", 9999,))
 72     sock.setblocking(False)
 73     sock.listen(128)
 74 
 75     inputs = []
 76     inputs.append(sock)
 77     while True:
 78         rlist,wlist,elist = select.select(inputs,[],[],0.05)
 79         for r in rlist:
 80             if r == sock:
 81                 """新請求到來"""
 82                 conn,addr = sock.accept()
 83                 conn.setblocking(False)
 84                 inputs.append(conn)
 85             else:
 86                 """客戶端發來數據"""
 87                 data = b""
 88                 while True:
 89                     try:
 90                         chunk = r.recv(1024)
 91                         data = data + chunk
 92                     except Exception as e:
 93                         chunk = None
 94                     if not chunk:
 95                         break
 96                 # data進行處理:請求頭和請求體
 97                 request = HttpRequest(data)
 98                 # 1. 請求頭中獲取url
 99                 # 2. 去路由中匹配,獲取指定的函數
100                 # 3. 執行函數,獲取返回值
101                 # 4. 將返回值 r.sendall(b‘alskdjalksdjf;asfd‘)
102                 import re
103                 flag = False
104                 func = None
105                 for route in routers:
106                     if re.match(route[0],request.url):
107                         flag = True
108                         func = route[1]
109                         break
110                 if flag:
111                     result = func(request)
112                     r.sendall(bytes(result,encoding=utf-8))
113                 else:
114                     r.sendall(b"404")
115 
116                 inputs.remove(r)
117                 r.close()
118 
119 if __name__ == __main__:
120     run()
自定義同步阻塞框架

2、Tornado異步非阻塞實現原理

tornado通過裝飾器 + Future 從而實現異步非阻塞。在view中yield一個future對象,然後再在發送相應數據前判斷view函數返回來的數據類型,如果是字符串類型直接返回,如果是future對象,則將返回來的future對象添加到async_request_dict中,先不給客戶端返回響應數據(此時可以處理其他客戶端的連接請求),等future對象的result有值時再返回,還可以設置超時時間,在規定的時間過後返回響應數據。 !! 關鍵是future對象,future對象裏有result屬性,默認為None,當result有值時再返回數據。

技術分享圖片
  1 import socket
  2 import select
  3 import time
  4 
  5 class HttpRequest(object):
  6     """
  7     用戶封裝用戶請求信息
  8     """
  9     def __init__(self, content):
 10         """
 11 
 12         :param content:用戶發送的請求數據:請求頭和請求體
 13         """
 14         self.content = content
 15 
 16         self.header_bytes = bytes()
 17         self.body_bytes = bytes()
 18 
 19         self.header_dict = {}
 20 
 21         self.method = ""
 22         self.url = ""
 23         self.protocol = ""
 24 
 25         self.initialize()
 26         self.initialize_headers()
 27 
 28     def initialize(self):
 29 
 30         temp = self.content.split(b\r\n\r\n, 1)
 31         if len(temp) == 1:
 32             self.header_bytes += temp
 33         else:
 34             h, b = temp
 35             self.header_bytes += h
 36             self.body_bytes += b
 37 
 38     @property
 39     def header_str(self):
 40         return str(self.header_bytes, encoding=utf-8)
 41 
 42     def initialize_headers(self):
 43         headers = self.header_str.split(\r\n)
 44         first_line = headers[0].split( )
 45         if len(first_line) == 3:
 46             self.method, self.url, self.protocol = headers[0].split( )
 47             for line in headers:
 48                 kv = line.split(:)
 49                 if len(kv) == 2:
 50                     k, v = kv
 51                     self.header_dict[k] = v
 52 
 53 class Future(object):
 54     def __init__(self,timeout=0):
 55         self.result = None
 56         self.timeout = timeout
 57         self.start = time.time()
 58 def main(request):
 59     f = Future(5)
 60     return f
 61 
 62 def index(request):
 63     return "indexasdfasdfasdf"
 64 
 65 
 66 routers = [
 67     (/main/,main),
 68     (/index/,index),
 69 ]
 70 
 71 def run():
 72     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 73     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 74     sock.bind(("127.0.0.1", 9999,))
 75     sock.setblocking(False)
 76     sock.listen(128)
 77 
 78     inputs = []
 79     inputs.append(sock)
 80 
 81     async_request_dict = {
 82         # ‘socket‘: futrue
 83     }
 84 
 85     while True:
 86         rlist,wlist,elist = select.select(inputs,[],[],0.05)
 87         for r in rlist:
 88             if r == sock:
 89                 """新請求到來"""
 90                 conn,addr = sock.accept()
 91                 conn.setblocking(False)
 92                 inputs.append(conn)
 93             else:
 94                 """客戶端發來數據"""
 95                 data = b""
 96                 while True:
 97                     try:
 98                         chunk = r.recv(1024)
 99                         data = data + chunk
100                     except Exception as e:
101                         chunk = None
102                     if not chunk:
103                         break
104                 # data進行處理:請求頭和請求體
105                 request = HttpRequest(data)
106                 # 1. 請求頭中獲取url
107                 # 2. 去路由中匹配,獲取指定的函數
108                 # 3. 執行函數,獲取返回值
109                 # 4. 將返回值 r.sendall(b‘alskdjalksdjf;asfd‘)
110                 import re
111                 flag = False
112                 func = None
113                 for route in routers:
114                     if re.match(route[0],request.url):
115                         flag = True
116                         func = route[1]
117                         break
118                 if flag:
119                     result = func(request)
120                     if isinstance(result,Future):
121                         async_request_dict[r] = result
122                     else:
123                         r.sendall(bytes(result,encoding=utf-8))
124                         inputs.remove(r)
125                         r.close()
126                 else:
127                     r.sendall(b"404")
128                     inputs.remove(r)
129                     r.close()
130 
131         for conn in async_request_dict.keys():
132             future = async_request_dict[conn]
133             start = future.start
134             timeout = future.timeout
135             ctime = time.time()
136             if (start + timeout) <= ctime :
137                 future.result = b"timeout"
138             if future.result:
139                 conn.sendall(future.result)
140                 conn.close()
141                 del async_request_dict[conn]
142                 inputs.remove(conn)
143 
144 if __name__ == __main__:
145     run()
自定義異步非阻塞框架

  
  

Tornado異步非阻塞的使用以及原理