1. 程式人生 > 實用技巧 >Flask - 進階二

Flask - 進階二

flask上下文管理

內容

  1. django和flask的區別

    - 概括的區別
    - django中提供功能列舉
    - 請求處理機制不同,django是通過傳參的形式,flask是通過上下文管理的方式實現。 
    
  2. wsgi

    django和flask內部都沒有實現socket,而是wsgi實現。
    wsgi是web服務網管介面,他是一個協議,實現它的協議的有:wsgiref/werkzurg/uwsgi
    
    # django之前
    from wsgiref.simple_server import make_server
     
    def run(environ, start_response):
        start_response('200 OK', [('Content-Type', 'text/html')])
        return [bytes('<h1>Hello, web!</h1>', encoding='utf-8'), ]
     
    if __name__ == '__main__':
        httpd = make_server('127.0.0.1', 8000, run)
        httpd.serve_forever()
    
    # flask之前
    from werkzeug.serving import run_simple
    from werkzeug.wrappers import BaseResponse
    
    
    def func(environ, start_response):
        print('請求來了')
        response = BaseResponse('你好')
        return response(environ, start_response)
    
    
    if __name__ == '__main__':
        run_simple('127.0.0.1', 5000, func)
    
  3. web框架都有的功能:路由、檢視、模板

  4. before_request/after_request

    相當於django的中介軟體,對所有的請求定製功能。 
    
  5. tempalte_global / template_filter

    定製在所有模板中都可以使用函式
    
  6. 路由系統處理本質 @app.route

    將url和函式打包成rule,天劍到map物件,map再放到app中。
    
  7. 路由

    • 裝飾器實現 / add_url_rule
    • endpoint
    • 動態路由
    • 如果給檢視加裝飾器:放route下面 、 functools
  8. 檢視

    • FBV
    • CBV(返回一個view函式,閉包的應用場景)
    • 應用到的功能都是通過匯入方式:request/session
  9. flask中支援session

    預設將session加密,然後儲存在瀏覽器的cookie中。 
    
  10. 模板比django方便一點

    支援python原生的語法
    
  11. 藍圖

    幫助我們可以對很多的業務功能做拆分,建立多個py檔案,把各個功能放置到各個藍圖,最後再將藍圖註冊到flask物件中。 
    
    幫助我們做目錄結構的拆分。
    
  12. threading.local物件

    自動為每個執行緒開闢空間,讓你進行存取值。
    
  13. 資料庫連結池 DBUtils (SQLHelper)

  14. 面向物件上下文管理(with)

概要

  • flask上下文原始碼
  • flask的擴充套件

詳細

1. 棧

後進先出,通過列表可以實現一個棧。

v = [11,22,33]
v.append(44)
v.pop()

應用場景:

  • 節流

2. 面向物件

class Foo(object):

    def __setattr__(self, key, value):
        print(key,value)
    
    def __getattr__(self, item):
        print(item)

obj = Foo()
obj.x = 123
obj.x
  • drf中request

    request.data
    request.query_params
    request._request
    request._request.POST
    request._request.GET
    
    #
    request.data
    request.query_params
    request.POST
    request.Data
    
class Local(object):
    def __init__(self):
        # self.storage = {}
        object.__setattr__(self,"storage",{})

    def __setattr__(self, key, value):
        self.storage[key] = value

    def __getattr__(self, item):
        return self.storage.get(item)

local = Local()
local.x1 = 123
print(local.x1)

3.執行緒唯一標識

import threading
from threading import get_ident

def task():
    ident = get_ident()
    print(ident)
for i in range(20):
    t = threading.Thread(target=task)
    t.start()

4.自定義threading.local

import threading
"""
storage = {
    1111:{'x1':[0,1,2,3]},
    1112:{'x1':1}
    1113:{'x1':2}
    1114:{'x1':3}
    1115:{'x1':4}
}
"""
class Local(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key] = value
        else:
            self.storage[ident] = {key:value}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident].get(item)

local = Local()

def task(arg):
    local.x1 = arg
    print(local.x1)

for i in range(5):
    t = threading.Thread(target=task,args=(i,))
    t.start()

5.加強版threading.local

import threading
"""
storage = {
    1111:{'x1':[]},
    1112:{'x1':[]}
    1113:{'x1':[]}
    1114:{'x1':[]}
    1115:{'x1':[]},
    1116:{'x1':[]}
}
"""
class Local(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key].append(value)
        else:
            self.storage[ident] = {key:[value,]}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident][item][-1]

local = Local()

def task(arg):
    local.x1 = arg
    print(local.x1)

for i in range(5):
    t = threading.Thread(target=task,args=(i,))
    t.start()

6.flask原始碼關於local的實現

try:
    # 協程
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident
"""
__storage__ = {
    1111:{"stack":[汪洋] }
}
"""
class Local(object):

    def __init__(self):
        # self.__storage__ = {}
        # self.__ident_func__ = get_ident
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__() # 1111
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

class LocalStack(object):
    def __init__(self):
        self._local = Local()
    def push(self, obj):
        """Pushes a new item to the stack"""
        # self._local.stack == getattr
        # rv = None
        rv = getattr(self._local, "stack", None)
        if rv is None:
            self._local.stack = rv = []
        rv.append(obj)
        return rv

    def pop(self):
        stack = getattr(self._local, "stack", None)
        if stack is None:
            return None
        elif len(stack) == 1:
            # release_local(self._local)
            # del __storage__[1111]
            return stack[-1]
        else:
            return stack.pop()

    @property
    def top(self):
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None

obj = LocalStack()
obj.push('汪洋')
obj.push('成說')

print(obj.top)

obj.pop()
obj.pop()

總結:

在flask中有個local類,他和threading.local的功能一樣,為每個執行緒開闢空間進行存取資料,他們兩個的內部實現機制,內部維護一個字典,以執行緒(協程)ID為key,進行資料隔離,如:
    __storage__ = {
		1211:{'k1':123}
    }
    
    obj = Local()
    obj.k1 = 123
    
在flask中還有一個LocalStack的類,他內部會依賴local物件,local物件負責儲存資料,localstack物件用於將local中的值維護成一個棧。
	__storage__ = {
		1211:{'stack':['k1',]}
    }

	obj= LocalStack()
    obj.push('k1')
    obj.top
    obj.pop()

7.flask原始碼中總共有2個localstack物件

# context locals
__storage__ = {
	1111:{'stack':[RequestContext(reqeust,session),]},
    1123:{'stack':[RequestContext(reqeust,session),]},
}
_request_ctx_stack = LocalStack()


__storage__ = {
	1111:{'stack':[AppContenxt(app,g),]}
    1123:{'stack':[AppContenxt(app,g),]},
}
_app_ctx_stack = LocalStack()
_request_ctx_stack.push('小魔方')
_app_ctx_stack.push('大魔方')
  • 上下文管理
    • 請求上下文管理
    • 應用上下文管理

7.原始碼初識

7.1 專案啟動

  • 例項化Flask物件

    app = Flask(__name__)
    
    1. 對app物件封裝一些初始化的值。
    	app.static_url_path
    	app.static_folder
    	app.template_folder
    	app.view_functions = {}
    2. 新增靜態檔案的路由
        self.add_url_rule(
            self.static_url_path + "/<path:filename>",
            endpoint="static",
            host=static_host,
            view_func=self.send_static_file,
            )
            
    3. 例項化了url_map的物件,以後在map物件中放 【/index/ 函式的物件應觀】
        class Flask(object):
            url_rule_class = Rule
            url_map_class = Map
    
            def __init__(self...):
                self.static_url_path
                self.static_folder
                self.template_folder
                self.view_functions = {}
                self.url_map = self.url_map_class()
        app = Flask()
        app.view_functions
    app.url_rule_class
    
  • 載入配置檔案(給app的config進行賦值)

    from flask import Flask
    
    app = Flask(__name__,static_url_path='/xx')
    
    app.config.from_object('xx.xx')
    
    1. 讀取配置檔案中的所有鍵值對,並將鍵值對全都放到Config物件。(Config是一個字典)
    2. 把包含所有配置檔案的Config物件,賦值給 app.config
    
    
  • 新增路由對映

    from flask import Flask
    
    app = Flask(__name__,static_url_path='/xx')
    
    @app.route('/index')
    def index():
        return 'hello world'
    
    1. 將 url = /index  和  methods = [GET,POST]  和 endpoint = "index"封裝到Rule物件
    
    2. 將Rule物件新增到 app.url_map中。
    
    3. 把endpoint和函式的對應關係放到 app.view_functions中。
    
  • 截止目前

    app.config
    app.url_map
    app.view_functions
    
  • 執行flask

    from flask import Flask
    
    app = Flask(__name__,static_url_path='/xx')
    
    @app.route('/index')
    def index():
        return 'hello world'
    
    if __name__ == '__main__':
        app.run()
    
    1. 內部呼叫werkzeug的run_simple,內部建立socket,監聽IP和埠,等待使用者請求到來。
    
    2. 一旦有使用者請求,執行app.__call__方法。
    
    	class Flask(object):
            def __call__(self,envion,start_response):
                pass
            def run(self):
    			run_simple(host, port, self, **options)
    
        if __name__ == '__main__':
            app.run()
    

擴充套件

class Foo:
     # 

obj = Foo()
obj() # __call__

obj[x1] = 123 # __setitem__
obj[x2]  # __getitem__

obj.x1 = 123 # __setattr__
obj.x2  # __getattr__

SQLhelper

  • 方式一

    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB
    
    """
    storage = {
        1111:{'stack':[]}
    }
    """
    
    class SqlHelper(object):
        def __init__(self):
            self.pool = PooledDB(
                creator=pymysql,  # 使用連結資料庫的模組
                maxconnections=6,  # 連線池允許的最大連線數,0和None表示不限制連線數
                mincached=2,  # 初始化時,連結池中至少建立的連結,0表示不建立
                blocking=True,  # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯
                ping=0,
                # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                host='127.0.0.1',
                port=3306,
                user='root',
                password='222',
                database='cmdb',
                charset='utf8'
            )
            self.local = threading.local()
    
        def open(self):
            conn = self.pool.connection()
            cursor = conn.cursor()
            return conn, cursor
    
        def close(self, cursor, conn):
            cursor.close()
            conn.close()
    
        def fetchall(self, sql, *args):
            """ 獲取所有資料 """
            conn, cursor = self.open()
            cursor.execute(sql, args)
            result = cursor.fetchall()
            self.close(conn, cursor)
            return result
    
        def fetchone(self, sql, *args):
            """ 獲取所有資料 """
            conn, cursor = self.open()
            cursor.execute(sql, args)
            result = cursor.fetchone()
            self.close(conn, cursor)
            return result
    
        def __enter__(self):
            conn,cursor = self.open()
            rv = getattr(self.local,'stack',None)
            if not rv:
                self.local.stack = [(conn,cursor),]
            else:
                rv.append((conn,cursor))
                self.local.stack = rv
            return cursor
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            rv = getattr(self.local,'stack',None)
            if not rv:
                # del self.local.stack
                return
            conn,cursor = self.local.stack.pop()
            cursor.close()
            conn.close()
    
    db = SqlHelper()
    
    from sqlhelper import db
    
    
    # db.fetchall(...)
    # db.fetchone(...)
    
    with db as c1:
        c1.execute('select 1')
        with db as c2:
            c1.execute('select 2')
        print(123)
    
    
    
  • 方式二

    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB
    
    POOL = PooledDB(
                creator=pymysql,  # 使用連結資料庫的模組
                maxconnections=6,  # 連線池允許的最大連線數,0和None表示不限制連線數
                mincached=2,  # 初始化時,連結池中至少建立的連結,0表示不建立
                blocking=True,  # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯
                ping=0,
                # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
                host='127.0.0.1',
                port=3306,
                user='root',
                password='222',
                database='cmdb',
                charset='utf8'
            )
    
    class SqlHelper(object):
        def __init__(self):
            self.conn = None
            self.cursor = None
    
        def open(self):
            conn = POOL.connection()
            cursor = conn.cursor()
            return conn, cursor
    
        def close(self):
            self.cursor.close()
            self.conn.close()
    
        def __enter__(self):
            self.conn,self.cursor = self.open()
            return self.cursor
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
    
    # ################## 使用 ##################
    
    with SqlHelper() as c1:
        c1.execute('select 1')
        with SqlHelper() as c2:
            c2.execute('select 2')
        print(666)
    
    with SqlHelper() as cursor:
        cursor.execute('select 1')
    
    with SqlHelper() as cursor:
        cursor.execute('select 1')