session原始碼流程
阿新 • • 發佈:2020-07-03
app.py
# session執行流程原始碼初探 from flask import Flask # 1. 例項化Flask物件 app = Flask(__name__) # 2. 設定路由 """ self.url_map=Map() url.map是一個物件,裡面可以看作是一個列表或者字典,儲存了所有的路由關係 app.url_map = [ ('/index',index), ('/login',login), ] """ @app.route('/index') def index(): return "index" if __name__ == '__main__': # 3. 啟動socket服務端 app.run() #app.py class Flask(_PackageBoundObject): # 4. 使用者請求到來 def __call__(self, environ, start_response): """ environ:是請求相關的所有資料,由wsgi做了初步的封裝 start_response:用於設定響應相關資料 """ return self.wsgi_app(environ, start_response)#…… def request_context(self, environ): return RequestContext(self, environ) #…… def wsgi_app(self, environ, start_response) ''' 1.獲取environ並對其進行再次封裝 2.從environ中獲取名稱為session的cookie,解密,反序列化 3.把兩個東西放到“某個神奇”的地方''' #ctx = RequestContext(self, environ) #self就是app物件,environ是請求相關的原始資料 #ctx.request = request_class(environ) #ctx.session = None ctx = self.request_context(environ) #這一句程式碼作用如上 error = None try: try: # 將ctx放到“空調上” # 執行 SecureCookieSessionInterface.open_session,去cookie中獲取值並給ctx.session重新賦值 ctx.push() # 5 去執行檢視函式 # 6 "某個神奇" 位置清空,到此整個請求響應就終止了 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ''' 7. "某個神奇"獲取session加密,序列化,寫入cookie ''' ctx.auto_pop(error) session_interface = SecureCookieSessionInterface() #…… def dispatch_request(self): req = _request_ctx_stack.top.request if req.routing_exception is not None: self.raise_routing_exception(req) rule = req.url_rule # if we provide automatic options for this URL and the # request came with the OPTIONS method, reply automatically if ( getattr(rule, "provide_automatic_options", False) and req.method == "OPTIONS" ): return self.make_default_options_response() # otherwise dispatch to the handler for that endpoint return self.view_functions[rule.endpoint](**req.view_args) def full_dispatch_request(self): self.try_trigger_before_first_request_functions() try: request_started.send(self) rv = self.preprocess_request() if rv is None: rv = self.dispatch_request() #呼叫檢視函式 except Exception as e: rv = self.handle_user_exception(e) #檢視函式執行完畢後,進行善後工作 return self.finalize_request(rv) def finalize_request(self, rv, from_error_handler=False): response = self.make_response(rv) try: response = self.process_response(response) request_finished.send(self, response=response) except Exception: if not from_error_handler: raise self.logger.exception( "Request finalizing failed with an error while handling an error" ) return response def process_response(self, response): ctx = _request_ctx_stack.top bp = ctx.request.blueprint funcs = ctx._after_request_functions if bp is not None and bp in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[bp])) if None in self.after_request_funcs: funcs = chain(funcs, reversed(self.after_request_funcs[None])) for handler in funcs: response = handler(response) if not self.session_interface.is_null_session(ctx.session): self.session_interface.save_session(self, ctx.session, response) return response # app.wsgi_app # app.request_class # app.session_interface # app.full_dispatch_request
session.py
class SecureCookieSessionInterface(SessionInterface): """The default session interface that stores sessions in signed cookies through the :mod:`itsdangerous` module. """ #: the salt that should be applied on top of the secret key for the #: signing of cookie based sessions. salt = "cookie-session" #: the hash function to use for the signature. The default is sha1 digest_method = staticmethod(hashlib.sha1) #: the name of the itsdangerous supported key derivation. The default #: is hmac. key_derivation = "hmac" #: A python serializer for the payload. The default is a compact #: JSON derived serializer with support for some extra Python types #: such as datetime objects or tuples. serializer = session_json_serializer session_class = SecureCookieSession def get_signing_serializer(self, app): if not app.secret_key: return None signer_kwargs = dict( key_derivation=self.key_derivation, digest_method=self.digest_method ) return URLSafeTimedSerializer( app.secret_key, salt=self.salt, serializer=self.serializer, signer_kwargs=signer_kwargs, ) #請求進來執行open_session def open_session(self, app, request): s = self.get_signing_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = total_seconds(app.permanent_session_lifetime) try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class() # 請求走執行save_session def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) # If the session is modified to be empty, remove the cookie. # If the session is empty, return without setting the cookie. if not session: if session.modified: response.delete_cookie( app.session_cookie_name, domain=domain, path=path ) return # Add a "Vary: Cookie" header if the session was accessed at all. if session.accessed: response.vary.add("Cookie") if not self.should_set_cookie(app, session): return httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) samesite = self.get_cookie_samesite(app) expires = self.get_expiration_time(app, session) val = self.get_signing_serializer(app).dumps(dict(session)) response.set_cookie( app.session_cookie_name, val, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure, samesite=samesite, )