1. 程式人生 > >Tornado之非同步authenticated

Tornado之非同步authenticated

authenticated是tornado自帶的登入驗證裝飾器,它的實現比較簡單,驗證比較簡易,無法做到真正意義的前後端分離並且是同步的方式,所以這裡我對它進行了重寫,以適應非同步JWT方式的登入驗證。

Tornado自帶的authenticated原始碼:
def authenticated(method): 
@functools.wraps(method)
    def wrapper(self, *args, **kwargs):
        if not self.current_user:
            if self.request.method in ("
GET", "HEAD"): url = self.get_login_url() if "?" not in url: if urlparse.urlsplit(url).scheme: # if login url is absolute, make next absolute too next_url = self.request.full_url()
else: next_url = self.request.uri url += "?" + urlencode(dict(next=next_url)) self.redirect(url) return raise HTTPError(403) return method(self, *args, **kwargs) return wrapper def get_login_url(self):
"""Override to customize the login URL based on the request. By default, we use the ``login_url`` application setting. """ self.require_setting("login_url", "@tornado.web.authenticated") return self.application.settings["login_url"] def redirect(self, url, permanent=False, status=None): if self._headers_written: raise Exception("Cannot redirect after headers have been written") if status is None: status = 301 if permanent else 302 else: assert isinstance(status, int) and 300 <= status <= 399 self.set_status(status) self.set_header("Location", utf8(url)) self.finish()

從原始碼可以看出,authenticated的作用:當current_user不存在時,它會呼叫get_login_url方法從settings裡面去取login_url,從而獲取user返回,當user未登入時,它會呼叫redirect重定向,返回301。


改寫步驟:
1.將同步的方法使用協程改寫
2.以JWT的方式校驗使用者token,使用者不存在或token已過期直接返回狀態碼,限制繼續訪問介面

實現程式碼:
 1 def authenticated(func):
 2     """
 3     重寫tornado authenticated
 4     :param func:
 5     :return:
 6     """
 7 
 8     async def wrapper(self, *args, **kwargs):
 9         res_data = {}
10         token = self.request.headers.get("token")
11         if token:
12             try:
13                 send_data = jwt.decode(
14                     token, self.settings["secret_key"],
15                     leeway=self.settings["jwt_expire"],
16                     options={"verify_exp": True}
17                 )
18                 user_id = send_data["id"]
19 
20                 # 從資料庫中獲取到user並設定給_current_user
21                 try:
22                     user = await self.application.objects.get(
23                         User, id=user_id
24                     )
25                     self._current_user = user
26 
27                     result = await func(self, *args, **kwargs)
28                     return result
29                 except User.DoesNotExist:
30                     res_data["content"] = "使用者不存在"
31                     self.set_status(401)
32             except Exception as e:
33                 print(e)
34                 self.set_status(401)
35                 res_data["content"] = "token不合法或已過期"
36         else:
37             self.set_status(401)
38             res_data["content"] = "缺少token"
39 
40         self.write(res_data)
41 
42     return wrapper
 

 請求介面的時候在headers裡面帶上token即可登入驗證,測試如下: