150行程式碼搭建非同步非阻塞Web框架
最近看Tornado原始碼給了我不少啟發,心血來潮決定自己試著只用python標準庫來實現一個非同步非阻塞web框架。花了點時間感覺還可以,一百多行的程式碼已經可以撐起一個極簡框架了。
一、準備工作
需要的相關知識點:
- HTTP協議的請求和響應
- IO多路複用
- asyncio
掌握上面三個點的知識就完全沒有問題,不是很清楚的同學我也推薦幾篇參考文章
HTTP協議詳細介紹(https://www.cnblogs.com/haiyan123/p/7777924.html)
Python篇-IO多路複用詳解(https://www.jianshu.com/p/818f27379a5e)
Python非同步IO之協程(一):從yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738)
實驗環境:
python 3.7.3
由於在框架中會使用到async/await關鍵字,所以只要確保python版本在3.5以上即可。
二、框架功能目標
我們的框架要實現最基本的幾個功能:
- 封裝HTTP請求響應
- 路由對映
- 類檢視和函式檢視
- 協程支援
當然一個完善的web框架需要實現的遠遠不止這些,這裡我們現在只需要它能跑起來就足夠了。
三、封裝HTTP協議
HTTP是基於TCP/IP通訊協議來實現資料傳輸,與一般的C/S相比,它的特點在於當客戶端(瀏覽器)向服務端發起HTTP請求,服務端響應資料後雙方立馬斷開連線,服務端無法主動向客戶端傳送資料。HTTP協議資料傳輸內容分為請求頭和請求體,請求頭和請求體之間使用"\r\n\r\n"進行分隔。在請求頭中,第一行包含了請求方式,請求路徑和HTTP協議,此後每一行以key: value的形式傳輸資料。
對於我們的web服務端來說,需要的就是解析http請求和處理http響應。
我們通過寫兩個類,HttpRequest和HttpResponse來實現。
3.1 HttpRequest
HttpRequest設計目標是解析從socket接收request資料
1 class HttpRequest(object): 2 def __init__(self, content: bytes): 3 self.content = content.decode('utf-8') 4 self.headers = {} 5 self.GET = {} 6 self.url = '' 7 self.full_path = '' 8 self.body = '' 9 try: 10 header, self.body = self.content.split('\r\n\r\n') 11 temp = header.split('\r\n') 12 first_line = temp.pop(0) 13 self.method, self.url, self.protocol = first_line.split(' ') 14 self.full_path = self.url 15 for t in temp: 16 k, v = t.split(': ', 1) 17 self.headers[k] = v 18 except Exception as e: 19 print(e) 20 if len(self.url.split('?')) > 1: # 解析GET引數 21 self.url = self.full_path.split('?')[0] # 把url中攜帶的引數去掉 22 parms = self.full_path.split('?')[1].split('&') 23 for p in parms: # 將GET引數新增到self.GET字典 24 k, v = p.split('=') 25 self.GET[k] = v
在類中,我們實現解析http請求的headers、method、url和GET引數,其實還有很多事情沒有做,比如使用POST傳輸資料時,資料是在請求體中,針對這部分內容我並沒有開始寫,原因在於本文主要目的還是非同步非阻塞框架,目前的功能已經足以支援我們進行下一步實驗了。
3.2 HttpResponse
HTTP響應也可以分為響應頭和響應體,我們可以很簡單的實現一個response:
1 class HttpResponse(object): 2 def __init__(self, data: str): 3 self.status_code = 200 # 預設響應狀態 200 4 self.headers = 'HTTP/1.1 %s OK\r\n' 5 self.headers += 'Server:AsyncWeb' 6 self.headers += '\r\n\r\n' 7 self.data = data 8 9 @property 10 def content(self): 11 return bytes((self.headers + self.data) % self.status_code, encoding='utf8')
HttpResponse中並沒有做太多的事情,接受一個字串,並使用content返回一個滿足HTTP響應格式的bytes。
從使用者呼叫角度,可以使用return HttpResponse("歡迎來到AsynicWeb")來返回資料。
我們也可以簡單的定義一個404頁面:
Http404 = HttpResponse('<html><h1>404</h1></html>') Http404.status_code = 404
四、路由對映
路由對映簡單理解就是從一個URL地址找到對應的邏輯函式。舉個例子,我們訪問http://127.0.0.1:8000這個頁面,在http請求中它的url是"/",在web伺服器中有一個函式index,web伺服器能夠由url地址"/"找到函式index,這就是一個路由對映。
其實路由對映實現起來非常簡單。我們只要定義一個對映列表,列表中的每個元素包含url和邏輯處理(檢視函式)兩部分,當一個http請求到達的時候,遍歷對映列表,使用正則匹配每一個url,如果請求的url和對映表中的相同,我們就可以取出對應的檢視函式。
路由對映表是完全由使用者來定義對映關係的,它應該使用一個我們定義的標準結構,比如:
routers = [ ('/$', IndexView), ('/home', asy) ]
五、類檢視和函式檢視
檢視是指能夠根據一個請求,執行某些邏輯運算,最終返回響應的模組。說到這裡,一個web框架的執行流程就出來了:
http請求——路由對映表——檢視——執行檢視獲取返回值——http響應
在我們的框架中,借鑑Django的設計,我們讓它支援類檢視(CBV)和函式檢視(FBV)兩種模式。
對於函式檢視,完全由使用者自己定義,只要至少能夠接受一個request引數即可
對於類檢視,我們需要做一些預處理,確保使用者按我們的規則來實現類檢視。
定義一個View類:
1 class View(object): 2 # CBV應繼承View類 3 def dispatch(self, request): 4 method = request.method.lower() 5 if hasattr(self, method): 6 return getattr(self, method)(request) 7 else: 8 return Http404
在View類中,我們只寫了一個dispatch方法,其實就做了一件事:反射。當我們在路由對映表中找對應的檢視時,如果判斷檢視屬於類,我們就呼叫dispatch方法。
從使用者角度來看,實現一個CBV只需要繼承View類,然後通過定義get、post、delete等方法來實現不同的處理。
六、socket和多路複用
上面幾個小節實現了web框架的大體執行路徑,從這節開始我們實現web伺服器的核心。
通過IO多路複用可以達到單執行緒實現高併發的效果,一個標準的IO多路複用寫法:
1 server = socket(AF_INET, SOCK_STREAM) 2 server.bind(("127.0.0.1", 8000)) 3 server.setblocking(False) # 設定非阻塞 4 server.listen(128) 5 Future_Task_Wait = {} 6 rlist = [server, ] 7 while True: 8 r, w, x = select.select(rlist, [], [], 0.1) 9 for o in r: 10 if o == server: 11 '''判斷o是server還是conn''' 12 conn, addr = o.accept() 13 conn.setblocking(False) # 設定非阻塞 14 rlist.append(conn) # 客戶連線 加入輪詢列表 15 else: 16 data = b"" 17 while True: # 接收客戶傳輸資料 18 try: 19 chunk = o.recv(1024) 20 data = data + chunk 21 except Exception as e: 22 chunk = None 23 if not chunk: 24 break 25 dosomething(o, data, routers) # 拿到資料乾點啥
通過這段程式碼我們可以獲得所有的請求了,下一步就是處理這些請求。
我們就定義一個dosomething函式
1 import re 2 import time 3 from types import FunctionType 4 5 def dosomething(o, data, routers): 6 '''解析http請求,尋找對映函式並執行得到結果
7 :param o: socket連線物件 8 :param data: socket接收資料 9 :return: 響應結果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判斷targe是函式還是類 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result
這段程式碼做了這麼幾件事。1.例項化HttpRequest;2.使用正則遍歷路由對映表;3.將request傳入檢視函式或類檢視的dispatch方法;4.拿到result結果
我們通過result = dosomething(o, data, routers)可以拿到結果,接下來我們只需要把結果發回給客戶端並斷開連線就可以了
o.sendall(result.content) # 由於result是一個HttpResponse物件 我們使用content屬性 rlist.remove(o) # 從輪詢中刪除連線 o.close() # 關閉連線
至此,我們的web框架已經搭建好了。
但它還是一個同步的框架,在我們的服務端中,其實一直通過while迴圈在監聽select是否變化,假如我們在檢視函式中新增IO操作,其他連線依然會阻塞等待,接下來讓我們的框架實現對協程的支援。
七、協程支援
在實現協程之前,我們先聊聊Tornado的Future物件。可以說Tornado非同步非阻塞的實現核心就是Future。
Future物件內部維護了一個重要屬性_result,這是一個標記位,一個剛例項化的Future內部的_result=None,我們可以通過其他操作來更改_result的狀態。另一方面,我們可以一直監聽每個Future物件的_result狀態,如果發生變化就執行某些特定的操作。
我們在第六節定義的dosomething函式中拿到了一個result,它應當是一個HttpResponse物件,那麼能不能返回一個Future物件呢。
假如result是一個Future物件,我們的服務端不立馬返回結果,而是把Future放進另一個輪詢列表中,當Future內的_result改變時再返回結果,就達到了非同步的效果。
我們也可以定義一個Future類,這個類維護只一個變數result:
1 class Future(object): 2 def __init__(self): 3 self.result = None
對於框架使用者來說,在檢視函式要麼返回一個HttpResponse物件代表立即返回,要麼返回一個Future物件說你先別管我,我把事情幹完了再通知你返回結果。
既然檢視函式返回的可能不只是HttpResponse物件,那麼我們就需要對第六步的程式碼增加額外的處理:
Future_Task_Wait = {} # 定義一個非同步Future字典 result = dosomething() # 拿到結果後執行下面判斷 if isinstance(result, Future): Future_Task_Wait[o] = result # Futre物件則加入字典 else: o.sendall(result.content) # 非Future物件直接返回結果並斷開連線 rlist.remove(o) o.close()
在while True輪詢內再增加一段程式碼,遍歷Future_Task_Wait字典:
rm_conn = [] # 需要移除列表的conn for conn, future in Future_Task_Wait.items(): if future.result: try: conn.sendall(HttpResponse(data=future.result).content) # 返回result finally: rlist.remove(conn) conn.close() rm_conn.append(conn) for conn in rm_conn: # 在字典中刪除conn del Future_Task_Wait[conn]
這樣,我們就可以返回一個Future來告訴伺服器這是將來才返回的物件。
那回歸正題,我們到底該如何使用協程?這裡我用的方法是建立一個子執行緒來執行協程事件迴圈,主執行緒永遠在監聽socket。
from threading import Thread def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() coroutine_loop = asyncio.new_event_loop() # 建立協程事件迴圈 run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,)) # 新起執行緒執行事件迴圈, 防止阻塞主執行緒 run_loop_thread.start() # 執行執行緒,即執行協程事件迴圈
當我們要把asyncdo方法新增作為協程任務時
asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)
好了,非同步非阻塞的核心程式碼分析的差不多了,將六七節的程式碼整合寫成一個類
1 import re 2 import time 3 import select 4 import asyncio 5 from socket import * 6 from threading import Thread 7 from types import FunctionType 8 from http.response import Http404, HttpResponse 9 from http.request import HttpRequest 10 from views import View 11 from core.future import Future 12 13 class App(object): 14 # web應用程式 15 coroutine_loop = None 16 17 def __new__(cls, *args, **kwargs): 18 # 使用單例模式 19 if not hasattr(cls, '_instance'): 20 App._instance = super().__new__(cls) 21 return App._instance 22 23 def listen(self, host, port, routers): 24 # IO多路複用監聽連線 25 server = socket(AF_INET, SOCK_STREAM) 26 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 27 server.bind((host, port)) 28 server.setblocking(False) 29 server.listen(128) 30 Future_Task_Wait = {} 31 rlist = [server, ] 32 while True: 33 r, w, x = select.select(rlist, [], [], 0.01) 34 for o in r: 35 if o == server: 36 '''判斷o是server還是conn''' 37 conn, addr = o.accept() 38 conn.setblocking(False) 39 rlist.append(conn) 40 else: 41 data = b"" 42 while True: 43 try: 44 chunk = o.recv(1024) 45 data = data + chunk 46 except Exception as e: 47 chunk = None 48 if not chunk: 49 break 50 try: 51 request = HttpRequest(data, o) 52 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 53 request.method, request.url) 54 flag = False 55 for router in routers: 56 if re.match(router[0], request.url): 57 target = router[1] 58 flag = True 59 break 60 if flag: 61 # 判斷targe是函式還是類 62 if isinstance(target, FunctionType): 63 result = target(request) 64 elif issubclass(target, View): 65 result = target().dispatch(request) 66 else: 67 result = Http404 68 else: 69 result = Http404 70 # 判斷result是不是future 71 if isinstance(result, Future): 72 Future_Task_Wait[o] = result 73 else: 74 o.sendall(result.content) 75 rlist.remove(o) 76 o.close() 77 except Exception as e: 78 print(e) 79 rm_conn = [] 80 for conn, future in Future_Task_Wait.items(): 81 if future.result: 82 try: 83 conn.sendall(HttpResponse(data=future.result).content) 84 finally: 85 rlist.remove(conn) 86 conn.close() 87 rm_conn.append(conn) 88 for conn in rm_conn: 89 del Future_Task_Wait[conn] 90 91 def run(self, host='127.0.0.1', port=8000, routers=()): 92 # 主執行緒select多路複用,處理http請求和響應 93 # 給協程單獨建立一個子執行緒,負責處理View函式提交的協程 94 def start_loop(loop): 95 asyncio.set_event_loop(loop) 96 loop.run_forever() 97 self.coroutine_loop = asyncio.new_event_loop() # 建立協程事件迴圈 98 run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,)) # 新起執行緒執行事件迴圈, 防止阻塞主執行緒 99 run_loop_thread.start() # 執行執行緒,即執行協程事件迴圈 100 self.listen(host, port, routers)
八、框架測試
現在,可以測試我們的web框架了。
1 import asyncio 2 from core.server import App 3 from views import View 4 from http.response import * 5 from core.future import Future 6 7 8 class IndexView(View): 9 def get(self, request): 10 return HttpResponse('歡迎來到首頁') 11 12 def post(self, request): 13 return HttpResponse('post') 14 15 def asy(request): 16 future = Future() 17 print('非同步呼叫') 18 wait = request.url.split('/')[-1] 19 try: 20 wait = int(wait) 21 except: 22 wait = 5 23 asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop) 24 print('返回Future') 25 return future 26 27 async def dosomething(future, wait): 28 # 非同步函式 29 await asyncio.sleep(wait)# 模擬非同步操作 30 future.result = '等待了%s秒' % wait 31 32 routers = [ 33 ('/$', IndexView), 34 ('/home', asy) 35 ] 36 37 # 從使用者角度只需使用run() 38 app = App() 39 app.run('127.0.0.1', 8080, routers=routers)
瀏覽器訪問http://127.0.0.1:8080,返回沒有問題,如果有同學使用Chrome可能會亂碼,那是因為我們的HttpResponse沒有返回指定編碼,新增一個響應頭即可。
瀏覽器訪問http://127.0.0.1:8080/home,這時候會執行協程,預設等待5s後返回結果,你可以在多個標籤頁訪問這個地址,通過等待時間來驗證我們的非同步框架是否正常工作。
九、其他
至此,我們要實現的非同步非阻塞web框架已經完成了。當然這個框架說到底還是太簡陋,後續完全可以優化HttpRequest和HttpResponse、增加對資料庫、模板語言等等元件的擴充套件。
完整原始碼已經上傳至https://github.com/sswest/AsyncWeb