網站搭建筆記精簡版---廖雪峰WebApp實戰-Day5:編寫Web框架筆記
阿新 • • 發佈:2018-12-18
網站搭建筆記精簡版-廖雪峰教程學習@[三川水祭] 僅作學習交流使用,將來的你會感謝現在拼命努力的自己!!!
本文首先對web框架進行了程式碼上的解釋,之後對編輯middleware部分進行了程式碼的分析,最後講述瞭如何測試從開始到現在所有程式碼的流程。
web框架定義
在編寫過程中,由於aiohttp太過於底層,因此自己定義一個web框架,以實現自動化URL資訊提取與函式的註冊,增加的檔案為coroweb.py
,如下程式碼
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # 匯入非同步工具包 import asyncio, os, inspect, logging, functools # 匯入網頁處理工具包 from urllib import parse # 匯入底層web框架 from aiohttp import web from apis import APIError # 將函式對映為URL處理函式,使得get函式附帶URL資訊 def get(path): ''' Define decorator @get('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'GET' wrapper.__route__ = path return wrapper return decorator # 將函式對映為URL處理函式,使得post函式附帶URL資訊 def post(path): ''' Define decorator @post('/path') ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): return func(*args, **kw) wrapper.__method__ = 'POST' # 儲存方法資訊 wrapper.__route__ = path # 儲存路徑資訊 return wrapper return decorator # 運用inspect模組,建立幾個函式用以獲取URL處理函式與request引數之間的關係 def get_required_kw_args(fn): # 收集沒有預設值的命名關鍵字引數 args = [] params = inspect.signature(fn).parameters # inspect模組是用來分析模組,函式 for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty: args.append(name) return tuple(args) def get_named_kw_args(fn): # 獲取命名關鍵字引數 args = [] params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: args.append(name) return tuple(args) def has_named_kw_args(fn): # 判斷有沒有命名關鍵字引數 params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.KEYWORD_ONLY: return True def has_var_kw_arg(fn): # 判斷有沒有關鍵字引數 params = inspect.signature(fn).parameters for name, param in params.items(): if param.kind == inspect.Parameter.VAR_KEYWORD: return True def has_request_arg(fn): # 判斷是否含有名字叫做'request'引數,且該引數是否為最後一個引數 sig = inspect.signature(fn) params = sig.parameters found = False for name, param in params.items(): if name == 'request': found = True continue if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD): raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig))) return found # 使用RequestHandler函式封裝一個URL處理函式,向request引數獲取URL處理函式所需要的引數 class RequestHandler(object): def __init__(self, app, fn): # 接收app引數 self._app = app self._func = fn self._has_request_arg = has_request_arg(fn) self._has_var_kw_arg = has_var_kw_arg(fn) self._has_named_kw_args = has_named_kw_args(fn) self._named_kw_args = get_named_kw_args(fn) self._required_kw_args = get_required_kw_args(fn) # RequestHandler本身是一個類,由於定義了__call__方法,因此將其例項視為函式 # 該函式從request中獲取必要引數,之後呼叫URL函式 # 最後將結果轉換為web.Response物件。上述比較符合aiohttp框架 async def __call__(self, request): # 構造協程 kw = None if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args: if request.method == 'POST': # 判斷客戶端發來的方法是否是POST if not request.content_type: # 查詢有沒提交資料的格式(EncType) return web.HTTPBadRequest(text='Missing Content-Type.') ct = request.content_type.lower() if ct.startswith('application/json'): params = await request.json() # 讀取請求的body程式碼作為json檔案 if not isinstance(params, dict): return web.HTTPBadRequest(text='JSON body must be object.') kw = params elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'): params = await request.post() kw = dict(**params) else: return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type) if request.method == 'GET': # 判斷客戶端發來的方法是否是GET qs = request.query_string if qs: kw = dict() for k, v in parse.parse_qs(qs, True).items(): kw[k] = v[0] if kw is None: kw = dict(**request.match_info) else: # 當函式引數沒有關鍵字引數時,移去request除命名關鍵字引數外所有的引數資訊 if not self._has_var_kw_arg and self._named_kw_args: # remove all unamed kw: copy = dict() for name in self._named_kw_args: if name in kw: copy[name] = kw[name] kw = copy # check named arg: for k, v in request.match_info.items(): if k in kw: logging.warning('Duplicate arg name in named arg and kw args: %s' % k) kw[k] = v if self._has_request_arg: kw['request'] = request # check required kw:即加入命名關鍵字引數(沒有附加預設值),request沒有提供相應的數值,報錯 if self._required_kw_args: for name in self._required_kw_args: if not name in kw: return web.HTTPBadRequest('Missing argument: %s' % name) logging.info('call with args: %s' % str(kw)) try: r = await self._func(**kw) return r except APIError as e: # APIError另外建立 return dict(error=e.error, data=e.data, message=e.message) # 新增靜態資料夾的路徑 def add_static(app): path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') app.router.add_static('/static/', path) logging.info('add static %s => %s' % ('/static/', path)) # 用來註冊一個URL處理函式,主要起驗證函式是否包含URL的相應方法與路徑資訊,並將其函式變為協程 def add_route(app, fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if path is None or method is None: raise ValueError('@get or @post not defined in %s.' % str(fn)) if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): fn = asyncio.coroutine(fn) logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys()))) app.router.add_route(method, path, RequestHandler(app, fn)) # 自動將module_name模組中所有符合條件的函式進行註冊 # 只需要向這個函式提供要批量註冊函式的檔案路徑,新編寫的函式就會篩選,註冊檔案內所有符合註冊條件的函式 def add_routes(app, module_name): n = module_name.rfind('.') if n == (-1): mod = __import__(module_name, globals(), locals()) else: name = module_name[n+1:] mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name) for attr in dir(mod): if attr.startswith('_'): continue fn = getattr(mod, attr) if callable(fn): method = getattr(fn, '__method__', None) path = getattr(fn, '__route__', None) if method and path: # 此處查詢path以及method是否存在而不是等待add_route函式查詢 add_route(app, fn)
編輯middleware
將函式返回值變為web.response(),修改的檔案為app.py
,程式碼如下:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- __author__ = 'Michael Liao' ''' async web application. ''' import logging; logging.basicConfig(level=logging.INFO) import asyncio, os, json, time from datetime import datetime from aiohttp import web from jinja2 import Environment, FileSystemLoader import orm from coroweb import add_routes, add_static # 初始化jinja2模板,以便其他函式使用jinja2模板 def init_jinja2(app, **kw): logging.info('init jinja2...') options = dict( autoescape = kw.get('autoescape', True), block_start_string = kw.get('block_start_string', '{%'), block_end_string = kw.get('block_end_string', '%}'), variable_start_string = kw.get('variable_start_string', '{{'), variable_end_string = kw.get('variable_end_string', '}}'), auto_reload = kw.get('auto_reload', True) ) path = kw.get('path', None) if path is None: path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') logging.info('set jinja2 template path: %s' % path) env = Environment(loader=FileSystemLoader(path), **options) filters = kw.get('filters', None) if filters is not None: for name, f in filters.items(): env.filters[name] = f app['__templating__'] = env # 使用middleware引數將函式返回值轉化為web.response物件 # middlewares是一個攔截器、中間鍵,即在URL真正被處理之前,需要經過一系列middleware的處理。 async def logger_factory(app, handler): # 協程,兩個引數 async def logger(request): logging.info('Request: %s %s' % (request.method, request.path)) # await asyncio.sleep(0.3) return (await handler(request)) return logger async def data_factory(app, handler): async def parse_data(request): if request.method == 'POST': if request.content_type.startswith('application/json'): request.__data__ = await request.json() logging.info('request json: %s' % str(request.__data__)) elif request.content_type.startswith('application/x-www-form-urlencoded'): request.__data__ = await request.post() logging.info('request form: %s' % str(request.__data__)) return (await handler(request)) return parse_data # 函式返回值轉化為'web.response'物件 async def response_factory(app, handler): async def response(request): logging.info('Response handler...') r = await handler(request) if isinstance(r, web.StreamResponse): return r if isinstance(r, bytes): resp = web.Response(body=r) resp.content_type = 'application/octet-stream' return resp if isinstance(r, str): if r.startswith('redirect:'): # 重定向 return web.HTTPFound(r[9:]) # 轉入別的網站 resp = web.Response(body=r.encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, dict): template = r.get('__template__') if template is None: resp = web.Response(body=json.dumps(r, ensure_ascii=False, default=lambda o: o.__dict__).encode('utf-8')) resp.content_type = 'application/json;charset=utf-8' return resp else: # jinja2模板 resp = web.Response(body=app['__templating__'].get_template(template).render(**r).encode('utf-8')) resp.content_type = 'text/html;charset=utf-8' return resp if isinstance(r, int) and r >= 100 and r < 600: return web.Response(r) if isinstance(r, tuple) and len(r) == 2: t, m = r if isinstance(t, int) and t >= 100 and t < 600: return web.Response(t, str(m)) # default: resp = web.Response(body=str(r).encode('utf-8')) resp.content_type = 'text/plain;charset=utf-8' return resp return response def datetime_filter(t): delta = int(time.time() - t) if delta < 60: return u'1分鐘前' if delta < 3600: return u'%s分鐘前' % (delta // 60) if delta < 86400: return u'%s小時前' % (delta // 3600) if delta < 604800: return u'%s天前' % (delta // 86400) dt = datetime.fromtimestamp(t) return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)
編寫測試程式碼
資料庫服務啟動
開啟cmd命令列,輸入net start mysql
,如果未安裝mysql,參考本部落格。
資料表建立
在cmd命令列輸入:mysql -u root -p
,進而輸入密碼進入資料庫,之後輸入以下命令建立網站的資料表。
drop database if exists awesome; # 建立資料庫 create database awesome; # 使用資料庫 use awesome; # 此處改為自己的主機名和密碼 grant select, insert, update, delete on awesome.* to 'root'@'localhost' identified by 'password'; create table users ( `id` varchar(50) not null, `email` varchar(50) not null, `passwd` varchar(50) not null, `admin` bool not null, `name` varchar(50) not null, `image` varchar(500) not null, `created_at` real not null, unique key `idx_email` (`email`), key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table blogs ( `id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `name` varchar(50) not null, `summary` varchar(200) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8; create table comments ( `id` varchar(50) not null, `blog_id` varchar(50) not null, `user_id` varchar(50) not null, `user_name` varchar(50) not null, `user_image` varchar(500) not null, `content` mediumtext not null, `created_at` real not null, key `idx_created_at` (`created_at`), primary key (`id`) ) engine=innodb default charset=utf8;
app.py部分的程式碼:
async def init(loop):
# 這裡的user和password改成自己的使用者名稱和密碼
await orm.create_pool(loop=loop, host='127.0.0.1', port=3306, user='root', password='www', db='awesome')
app = web.Application(loop=loop, middlewares=[
logger_factory, response_factory
])
init_jinja2(app, filters=dict(datetime=datetime_filter))
# 對接收到的不同型別的瀏覽器請求語言具體處理的程式碼放在'handlers.py'檔案中
add_routes(app, 'handlers')
add_static(app)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 9000)
logging.info('server started at http://127.0.0.1:9000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
Handlers.py部分的程式碼
import asyncio
from coroweb import get,post
#編寫用於測試的URL處理函式
@get('/')
async def handler_url_blog(request):
body='<h1>Awesome</h1>'
return body