1. 程式人生 > >150行程式碼搭建非同步非阻塞Web框架

150行程式碼搭建非同步非阻塞Web框架

最近看Tornado原始碼給了我不少啟發,心血來潮決定自己試著只用python標準庫來實現一個非同步非阻塞web框架。花了點時間感覺還可以,一百多行的程式碼已經可以撐起一個極簡框架了。

一、準備工作

需要的相關知識點:

  • HTTP協議的請求和響應
  • IO多路複用
  • asyncio

掌握上面三個點的知識就完全沒有問題,不是很清楚的同學我也推薦幾篇參考文章

  HTTP協議詳細介紹(https://www.cnblogs.com/haiyan123/p/7777924.html)

  Python篇-IO多路複用詳解(https://www.jianshu.com/p/818f27379a5e)

  Python非同步IO之協程(一):從yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738)

實驗環境:

python 3.7.3

 由於在框架中會使用到async/await關鍵字,所以只要確保python版本在3.5以上即可。

 二、框架功能目標

我們的框架要實現最基本的幾個功能:

  • 封裝HTTP請求響應
  • 路由對映
  • 類檢視和函式檢視
  • 協程支援

 當然一個完善的web框架需要實現的遠遠不止這些,這裡我們現在只需要它能跑起來就足夠了。

三、封裝HTTP協議

HTTP是基於TCP/IP通訊協議來實現資料傳輸,與一般的C/S相比,它的特點在於當客戶端(瀏覽器)向服務端發起HTTP請求,服務端響應資料後雙方立馬斷開連線,服務端無法主動向客戶端傳送資料。HTTP協議資料傳輸內容分為請求頭和請求體,請求頭和請求體之間使用"\r\n\r\n"進行分隔。在請求頭中,第一行包含了請求方式,請求路徑和HTTP協議,此後每一行以key: value的形式傳輸資料。

對於我們的web服務端來說,需要的就是解析http請求和處理http響應。

我們通過寫兩個類,HttpRequest和HttpResponse來實現。

3.1 HttpRequest

HttpRequest設計目標是解析從socket接收request資料

 1 class HttpRequest(object):
 2     def __init__(self, content: bytes):
 3         self.content = content.decode('utf-8')
 4         self.headers = {}
 5         self.GET = {}
 6         self.url = ''
 7         self.full_path = ''
 8         self.body = ''
 9         try:
10             header, self.body = self.content.split('\r\n\r\n')
11             temp = header.split('\r\n')
12             first_line = temp.pop(0)
13             self.method, self.url, self.protocol = first_line.split(' ')
14             self.full_path = self.url
15             for t in temp:
16                 k, v = t.split(': ', 1)
17                 self.headers[k] = v
18         except Exception as e:
19             print(e)
20         if len(self.url.split('?')) > 1: # 解析GET引數
21             self.url = self.full_path.split('?')[0] # 把url中攜帶的引數去掉
22             parms = self.full_path.split('?')[1].split('&')
23             for p in parms: # 將GET引數新增到self.GET字典
24                 k, v = p.split('=')
25                 self.GET[k] = v

在類中,我們實現解析http請求的headers、method、url和GET引數,其實還有很多事情沒有做,比如使用POST傳輸資料時,資料是在請求體中,針對這部分內容我並沒有開始寫,原因在於本文主要目的還是非同步非阻塞框架,目前的功能已經足以支援我們進行下一步實驗了。

3.2 HttpResponse

HTTP響應也可以分為響應頭和響應體,我們可以很簡單的實現一個response:

 1 class HttpResponse(object):
 2     def __init__(self, data: str):
 3         self.status_code = 200 # 預設響應狀態 200
 4         self.headers = 'HTTP/1.1 %s OK\r\n'
 5         self.headers += 'Server:AsyncWeb'
 6         self.headers += '\r\n\r\n'
 7         self.data = data
 8 
 9     @property
10     def content(self):
11         return bytes((self.headers + self.data) % self.status_code, encoding='utf8')

HttpResponse中並沒有做太多的事情,接受一個字串,並使用content返回一個滿足HTTP響應格式的bytes。

從使用者呼叫角度,可以使用return HttpResponse("歡迎來到AsynicWeb")來返回資料。

我們也可以簡單的定義一個404頁面:

Http404 = HttpResponse('<html><h1>404</h1></html>')
Http404.status_code = 404

四、路由對映

路由對映簡單理解就是從一個URL地址找到對應的邏輯函式。舉個例子,我們訪問http://127.0.0.1:8000這個頁面,在http請求中它的url是"/",在web伺服器中有一個函式index,web伺服器能夠由url地址"/"找到函式index,這就是一個路由對映。

其實路由對映實現起來非常簡單。我們只要定義一個對映列表,列表中的每個元素包含url和邏輯處理(檢視函式)兩部分,當一個http請求到達的時候,遍歷對映列表,使用正則匹配每一個url,如果請求的url和對映表中的相同,我們就可以取出對應的檢視函式。

路由對映表是完全由使用者來定義對映關係的,它應該使用一個我們定義的標準結構,比如:

routers = [
    ('/$', IndexView),
    ('/home', asy)
]

 

五、類檢視和函式檢視

檢視是指能夠根據一個請求,執行某些邏輯運算,最終返回響應的模組。說到這裡,一個web框架的執行流程就出來了:

    http請求——路由對映表——檢視——執行檢視獲取返回值——http響應

在我們的框架中,借鑑Django的設計,我們讓它支援類檢視(CBV)和函式檢視(FBV)兩種模式。

對於函式檢視,完全由使用者自己定義,只要至少能夠接受一個request引數即可

對於類檢視,我們需要做一些預處理,確保使用者按我們的規則來實現類檢視。

定義一個View類:

1 class View(object):
2     # CBV應繼承View類
3     def dispatch(self, request):
4         method = request.method.lower()
5         if hasattr(self, method):
6             return getattr(self, method)(request)
7         else:
8             return Http404

 在View類中,我們只寫了一個dispatch方法,其實就做了一件事:反射。當我們在路由對映表中找對應的檢視時,如果判斷檢視屬於類,我們就呼叫dispatch方法。

從使用者角度來看,實現一個CBV只需要繼承View類,然後通過定義get、post、delete等方法來實現不同的處理。

六、socket和多路複用

上面幾個小節實現了web框架的大體執行路徑,從這節開始我們實現web伺服器的核心。

通過IO多路複用可以達到單執行緒實現高併發的效果,一個標準的IO多路複用寫法:

 1 server = socket(AF_INET, SOCK_STREAM)
 2 server.bind(("127.0.0.1", 8000))
 3 server.setblocking(False) # 設定非阻塞
 4 server.listen(128)
 5 Future_Task_Wait = {}
 6 rlist = [server, ]
 7 while True:
 8     r, w, x = select.select(rlist, [], [], 0.1)
 9     for o in r:
10         if o == server:
11             '''判斷o是server還是conn'''
12             conn, addr = o.accept()
13             conn.setblocking(False) # 設定非阻塞
14             rlist.append(conn) # 客戶連線 加入輪詢列表
15         else:
16             data = b""
17             while True: # 接收客戶傳輸資料
18                 try:
19                     chunk = o.recv(1024)
20                     data = data + chunk
21                 except Exception as e:
22                     chunk = None
23                 if not chunk:
24                     break
25             dosomething(o, data, routers) # 拿到資料乾點啥

 通過這段程式碼我們可以獲得所有的請求了,下一步就是處理這些請求。

我們就定義一個dosomething函式

 1 import re
 2 import time
 3 from types import FunctionType
 4 
 5 def dosomething(o, data, routers):
 6     '''解析http請求,尋找對映函式並執行得到結果
7 :param o: socket連線物件 8 :param data: socket接收資料 9 :return: 響應結果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判斷targe是函式還是類 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result

這段程式碼做了這麼幾件事。1.例項化HttpRequest;2.使用正則遍歷路由對映表;3.將request傳入檢視函式或類檢視的dispatch方法;4.拿到result結果

我們通過result = dosomething(o, data, routers)可以拿到結果,接下來我們只需要把結果發回給客戶端並斷開連線就可以了

o.sendall(result.content)  # 由於result是一個HttpResponse物件 我們使用content屬性
rlist.remove(o) # 從輪詢中刪除連線
o.close() # 關閉連線

至此,我們的web框架已經搭建好了。

但它還是一個同步的框架,在我們的服務端中,其實一直通過while迴圈在監聽select是否變化,假如我們在檢視函式中新增IO操作,其他連線依然會阻塞等待,接下來讓我們的框架實現對協程的支援。

七、協程支援

在實現協程之前,我們先聊聊Tornado的Future物件。可以說Tornado非同步非阻塞的實現核心就是Future。

Future物件內部維護了一個重要屬性_result,這是一個標記位,一個剛例項化的Future內部的_result=None,我們可以通過其他操作來更改_result的狀態。另一方面,我們可以一直監聽每個Future物件的_result狀態,如果發生變化就執行某些特定的操作。

我們在第六節定義的dosomething函式中拿到了一個result,它應當是一個HttpResponse物件,那麼能不能返回一個Future物件呢。

假如result是一個Future物件,我們的服務端不立馬返回結果,而是把Future放進另一個輪詢列表中,當Future內的_result改變時再返回結果,就達到了非同步的效果。

我們也可以定義一個Future類,這個類維護只一個變數result:

1 class Future(object):
2     def __init__(self):
3         self.result = None

 對於框架使用者來說,在檢視函式要麼返回一個HttpResponse物件代表立即返回,要麼返回一個Future物件說你先別管我,我把事情幹完了再通知你返回結果。

既然檢視函式返回的可能不只是HttpResponse物件,那麼我們就需要對第六步的程式碼增加額外的處理:

Future_Task_Wait = {} # 定義一個非同步Future字典
result = dosomething() # 拿到結果後執行下面判斷
if isinstance(result, Future):
    Future_Task_Wait[o] = result # Futre物件則加入字典
else:
    o.sendall(result.content) # 非Future物件直接返回結果並斷開連線
    rlist.remove(o)
    o.close()

在while True輪詢內再增加一段程式碼,遍歷Future_Task_Wait字典:

rm_conn = [] # 需要移除列表的conn
for conn, future in Future_Task_Wait.items():
    if future.result:
        try:
            conn.sendall(HttpResponse(data=future.result).content) # 返回result
        finally:
            rlist.remove(conn) 
            conn.close()
            rm_conn.append(conn)
for conn in rm_conn: # 在字典中刪除conn
    del Future_Task_Wait[conn]

 這樣,我們就可以返回一個Future來告訴伺服器這是將來才返回的物件。

那回歸正題,我們到底該如何使用協程?這裡我用的方法是建立一個子執行緒來執行協程事件迴圈,主執行緒永遠在監聽socket。

from threading import Thread
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
coroutine_loop = asyncio.new_event_loop()  # 建立協程事件迴圈
run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,))  # 新起執行緒執行事件迴圈, 防止阻塞主執行緒
run_loop_thread.start()  # 執行執行緒,即執行協程事件迴圈

當我們要把asyncdo方法新增作為協程任務時

asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)

好了,非同步非阻塞的核心程式碼分析的差不多了,將六七節的程式碼整合寫成一個類

  1 import re
  2 import time
  3 import select
  4 import asyncio
  5 from socket import *
  6 from threading import Thread
  7 from types import FunctionType
  8 from http.response import Http404, HttpResponse
  9 from http.request import HttpRequest
 10 from views import View
 11 from core.future import Future
 12 
 13 class App(object):
 14     # web應用程式
 15     coroutine_loop = None
 16 
 17     def __new__(cls, *args, **kwargs):
 18         # 使用單例模式
 19         if not hasattr(cls, '_instance'):
 20             App._instance = super().__new__(cls)
 21         return App._instance
 22 
 23     def listen(self, host, port, routers):
 24         # IO多路複用監聽連線
 25         server = socket(AF_INET, SOCK_STREAM)
 26         server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
 27         server.bind((host, port))
 28         server.setblocking(False)
 29         server.listen(128)
 30         Future_Task_Wait = {}
 31         rlist = [server, ]
 32         while True:
 33             r, w, x = select.select(rlist, [], [], 0.01)
 34             for o in r:
 35                 if o == server:
 36                     '''判斷o是server還是conn'''
 37                     conn, addr = o.accept()
 38                     conn.setblocking(False)
 39                     rlist.append(conn)
 40                 else:
 41                     data = b""
 42                     while True:
 43                         try:
 44                             chunk = o.recv(1024)
 45                             data = data + chunk
 46                         except Exception as e:
 47                             chunk = None
 48                         if not chunk:
 49                             break
 50                     try:
 51                         request = HttpRequest(data, o)
 52                         print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0],
 53                               request.method, request.url)
 54                         flag = False
 55                         for router in routers:
 56                             if re.match(router[0], request.url):
 57                                 target = router[1]
 58                                 flag = True
 59                                 break
 60                         if flag:
 61                             # 判斷targe是函式還是類
 62                             if isinstance(target, FunctionType):
 63                                 result = target(request)
 64                             elif issubclass(target, View):
 65                                 result = target().dispatch(request)
 66                             else:
 67                                 result = Http404
 68                         else:
 69                             result = Http404
 70                         # 判斷result是不是future
 71                         if isinstance(result, Future):
 72                             Future_Task_Wait[o] = result
 73                         else:
 74                             o.sendall(result.content)
 75                             rlist.remove(o)
 76                             o.close()
 77                     except Exception as e:
 78                         print(e)
 79             rm_conn = []
 80             for conn, future in Future_Task_Wait.items():
 81                 if future.result:
 82                     try:
 83                         conn.sendall(HttpResponse(data=future.result).content)
 84                     finally:
 85                         rlist.remove(conn)
 86                         conn.close()
 87                         rm_conn.append(conn)
 88             for conn in rm_conn:
 89                 del Future_Task_Wait[conn]
 90 
 91     def run(self, host='127.0.0.1', port=8000, routers=()):
 92         # 主執行緒select多路複用,處理http請求和響應
 93         # 給協程單獨建立一個子執行緒,負責處理View函式提交的協程
 94         def start_loop(loop):
 95             asyncio.set_event_loop(loop)
 96             loop.run_forever()
 97         self.coroutine_loop = asyncio.new_event_loop()  # 建立協程事件迴圈
 98         run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,))  # 新起執行緒執行事件迴圈, 防止阻塞主執行緒
 99         run_loop_thread.start()  # 執行執行緒,即執行協程事件迴圈
100         self.listen(host, port, routers)

八、框架測試

現在,可以測試我們的web框架了。

 1 import asyncio
 2 from core.server import App
 3 from views import View
 4 from http.response import *
 5 from core.future import Future
 6 
 7 
 8 class IndexView(View):
 9     def get(self, request):
10         return HttpResponse('歡迎來到首頁')
11 
12     def post(self, request):
13         return HttpResponse('post')
14 
15 def asy(request):
16     future = Future()
17     print('非同步呼叫')
18     wait = request.url.split('/')[-1]
19     try:
20         wait = int(wait)
21     except:
22         wait = 5
23     asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop)
24     print('返回Future')
25     return future
26 
27 async def dosomething(future, wait):
28     # 非同步函式
29     await asyncio.sleep(wait)# 模擬非同步操作
30     future.result = '等待了%s秒' % wait
31 
32 routers = [
33     ('/$', IndexView),
34     ('/home', asy)
35 ]
36 
37 # 從使用者角度只需使用run()
38 app = App()
39 app.run('127.0.0.1', 8080, routers=routers)

瀏覽器訪問http://127.0.0.1:8080,返回沒有問題,如果有同學使用Chrome可能會亂碼,那是因為我們的HttpResponse沒有返回指定編碼,新增一個響應頭即可。

瀏覽器訪問http://127.0.0.1:8080/home,這時候會執行協程,預設等待5s後返回結果,你可以在多個標籤頁訪問這個地址,通過等待時間來驗證我們的非同步框架是否正常工作。

九、其他

至此,我們要實現的非同步非阻塞web框架已經完成了。當然這個框架說到底還是太簡陋,後續完全可以優化HttpRequest和HttpResponse、增加對資料庫、模板語言等等元件的擴充套件。

完整原始碼已經上傳至https://github.com/sswest/AsyncWeb