初識aiohttp非同步框架之服務端用法
typora-copy-images-to: ipic
[TOC]
配置環境
首先檢查你的python
版本:
$ python3 -V
Python 3.6.3
複製程式碼
安裝aiohttp
:
$ pip3 install aiohttp
複製程式碼
檢視aiohttp版本號:
$ python3 -c 'import aiohttp; print(aiohttp.__version__)'
3.0.7
複製程式碼
專案結構與其他基於python的web專案非常相似:
.
├── README.rst
└── polls
├── Makefile
├── README.rst
├── aiohttpdemo_polls
│ ├── __init__.py
│ ├── __main__.py
│ ├── db.py
│ ├── main.py
│ ├── routes.py
│ ├── templates
│ ├── utils.py
│ └── views.py
├── config
│ └── polls.yaml
├── images
│ └── example.png
├── setup.py
├── sql
│ ├── create_tables.sql
│ ├── install.sh
│ └── sample_data.sql
└── static
└── style.css
複製程式碼
開始第一個aiohttp應用
這個教程基於Django的投票應用教程。
應用
所有的aiohttp伺服器都圍繞aiohttp.web.Application例項來構建。用於註冊startup/cleanup訊號,以及連線路由等。
建立一個專案:
vote
├── config
│ └── __init__.py
├── models
│ └── __init__.py
├── static
├── template
└── application
└── __init__.py
複製程式碼
目錄vote下面分別建立了config、models、application、static、template。
這裡我使用pycharm開發,圖示如下:
建立一個應用:
from aiohttp import web
app = web.Application()
web.run_app(app,host='0.0.0.0',port=9000)
複製程式碼
儲存於vote/main.py並啟動伺服器:
$ python3 /Users/junxi/program/vote/main.py
複製程式碼
這裡的vote是專案的根目錄。
你將在命令列中看到如下輸出:
======== Running on http://0.0.0.0:9000 ========
(Press CTRL+C to quit)
複製程式碼
在瀏覽器中開啟http://localhost:9000/
$ curl -X GET http://localhost:9000
複製程式碼
不過,對於全部請求現在只會返回404: Not Found
,讓我們建立一個路由和檢視來展示一些更有意義的東西。
檢視
讓我們從第一個檢視開始。建立application/views.py
並加入如下程式碼:
from aiohttp import web
async def hello(request):
return web.Response(text='Hello Aiohttp!')
複製程式碼
現在我們應該為這個 index
檢視建立一個路由。 將如下程式碼寫入 application/routes.py
(分離檢視,路由,模型是種很好的做法。 因為你可能擁有很多這些元件,放在不同的地方可以方便地管理程式碼):
from .views import hello
def setup_routes(app):
app.router.add_get('/hello',hello)
複製程式碼
此外,我們應該在某個地方呼叫 setup_routes
函式,最好是在 main.py
中呼叫它:
from aiohttp import web
from application.routes import setup_routes
app = web.Application()
setup_routes(app)
web.run_app(app,port=9000)
複製程式碼
再次啟動伺服器. 現在我們開啟瀏覽器就可以看見:
$ curl -X GET localhost:9000/hello
Hello Aiohttp!
複製程式碼
工作目錄應該是像下面這樣:
vote
├── application
│ ├── __init__.py
│ ├── routes.py
│ └── views.py
├── config
│ ├── __init__.py
│ └── settings.py
├── main.py
├── models
│ ├── __init__.py
├── static
└── template
複製程式碼
配置檔案
aiohttp 的配置是不可知的。 這意味著這個庫不需要任何配置方法,並且也沒有內建支援任何配置模式。
但是請考慮下面這些事實:
99% 的伺服器都有配置檔案.
每個產品(除了像 Django 和 Flask 等基於 Python 的解決方案外)都不將配置檔案寫入原始碼。
比如 Nginx 預設將自己的配置檔案儲存在
/etc/nginx
資料夾下。Mongo 將配置檔案存為
/etc/mongodb.conf
。驗證配置檔案是個好主意,充分的檢查可以在產品部署時避免許多愚蠢的錯誤。
因此,我們 建議 使用以下方法:
- 將配置存為
yaml
檔案(json
或ini
格式也不錯,但是yaml
格式是最好的).- 從預定位置載入
yaml
配置。例如./config/app_cfg.yaml
,/etc/app_cfg.yaml
。- 保持可以通過命令列引數覆蓋配置檔案的能力。例如
./run_app --config=/opt/config/app_cfg.yaml
。- 對於載入的字典應用嚴格的檢查。 trafaret,colander or JSON schema 是這型別工作的好候選。
載入配置並在應用中讀取:
# load config from yaml file in current dir
conf = load_config(str(pathlib.Path('.') / 'config' / 'settings.yaml'))
app['config'] = conf
複製程式碼
或者使用py檔案當作配置檔案:
├── config
│ ├── __init__.py
│ └── settings.py
複製程式碼
構建資料庫
資料庫模式
操作MySQL資料庫的工具,之前django專案一直使用本身自帶的orm,tornado專案使用的torndb.py。其他專案則使用的pymysql庫,pymysql庫的用法在這裡。
本文使用MySQL資料庫和aiomysql這個非同步操作MySQL的庫。
安裝aiomysql
需要依賴pymysql
$ pip3 install pymysql
$ pip3 install aiomysql
複製程式碼
我們使用 aiomysql 來描述資料庫模式。
aiomysql官網連線示例
import asyncio
from aiomysql import create_pool
loop = asyncio.get_event_loop()
async def go():
async with create_pool(host='127.0.0.1',port=3306,user='root',password='',db='mysql',loop=loop) as pool:
async with pool.get() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
value = await cur.fetchone()
print(value)
loop.run_until_complete(go())
複製程式碼
aiomysql官網連線池示例
import asyncio
import aiomysql
async def test_example(loop):
pool = await aiomysql.create_pool(host='127.0.0.1',loop=loop)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 42;")
print(cur.description)
(r,) = await cur.fetchone()
assert r == 42
pool.close()
await pool.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(test_example(loop))
複製程式碼
SQLAlchemy可選整合的示例
這裡不使用sqlalchemy這個orm,原因:遷移功能不怎麼好使,用慣了django的orm,感覺別的不咋好用。寫原生sql練習自己的原生sql編寫能力。
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
tbl = sa.Table('tbl',metadata,sa.Column('id',sa.Integer,primary_key=True),sa.Column('val',sa.String(255)))
async def go(loop):
engine = await create_engine(user='root',db='test_pymysql',host='127.0.0.1',loop=loop)
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
await conn.execute(tbl.insert().values(val='xyz'))
async for row in conn.execute(tbl.select()):
print(row.id,row.val)
engine.close()
await engine.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
複製程式碼
建立資料庫表
檢視mysql
版本
$ mysql --version
/usr/local/mysql/bin/mysql Ver 14.14 Distrib 5.7.20,for macos10.12 (x86_64) using EditLine wrapper
複製程式碼
建立一個資料庫vote,並增加授權使用者
$ mysql -uroot -p123456
mysql> CREATE DATABASE IF NOT EXISTS vote CHARACTER SET utf8 COLLATE utf8_general_ci;
mysql> grant all on vote.* to vote identified by '123456';
複製程式碼
建立表user
CREATE TABLE IF NOT EXISTS `user`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '使用者ID',`delete_flag` tinyint(1) NOT NULL DEFAULT 0 COMMENT '刪除標誌',`name` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '暱稱',`phone` VARCHAR(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '電話',`email` VARCHAR(40) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '郵箱',`password` VARCHAR(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '密碼',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',PRIMARY KEY ( `id` ),INDEX `email` (`email`) USING BTREE,INDEX `phone` (`phone`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
複製程式碼
檢視user表結構
+-------------+-------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+-------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| name | varchar(40) | NO | | NULL | |
| phone | varchar(11) | NO | MUL | NULL | |
| email | varchar(40) | NO | MUL | NULL | |
| password | varchar(16) | NO | | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+-------------+------+-----+-------------------+----------------+
複製程式碼
建立表question
CREATE TABLE IF NOT EXISTS `question`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '問題ID',`user_id` INT(11) NOT NULL COMMENT '使用者ID',`question_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '問題內容',FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,INDEX `user_id` (`user_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
複製程式碼
檢視question表結構
+---------------+--------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| user_id | int(11) | NO | MUL | NULL | |
| question_text | varchar(200) | NO | | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+---------------+--------------+------+-----+-------------------+----------------+
複製程式碼
建立表choice
CREATE TABLE IF NOT EXISTS `choice`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '選擇ID',`choice_text` VARCHAR(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '選擇內容',`votes` INT(11) NOT NULL COMMENT '得票數',`question_id` INT(11) NOT NULL COMMENT '問題ID',FOREIGN KEY (`question_id`) REFERENCES `question` (`id`) ON DELETE RESTRICT ON UPDATE RESTRICT,INDEX `question_id` (`question_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
複製程式碼
檢視choice表結構
+-------------+--------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+--------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| delete_flag | tinyint(1) | NO | | 0 | |
| choice_text | varchar(200) | YES | | NULL | |
| votes | int(11) | NO | | NULL | |
| question_id | int(11) | NO | MUL | NULL | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+--------------+------+-----+-------------------+----------------+
複製程式碼
建立連線池
我們需要建立一個全域性的連線池,每個HTTP請求都可以從連線池中直接獲取資料庫連線。使用連線池的好處是不必頻繁地開啟和關閉資料庫連線,而是能複用就儘量複用。
預設情況下將編碼設定為utf8
,自動提交事務:
async def create_pool(loop,**kw):
"""定義mysql全域性連線池"""
logging.info('create database connection pool...')
global _mysql_pool
_mysql_pool = await aiomysql.create_pool(host=DATABASES['host'],port=DATABASES['port'],user=DATABASES['user'],password=DATABASES['password'],db=DATABASES['db'],loop=loop,charset=kw.get('charset','utf8'),autocommit=kw.get('autocommit',True),maxsize=kw.get('maxsize',10),minsize=kw.get('minsize',1))
return _mysql_pool
複製程式碼
封裝增刪改查
Web App裡面有很多地方都要訪問資料庫。訪問資料庫需要建立資料庫連線、遊標物件,然後執行SQL語句,最後處理異常,清理資源。這些訪問資料庫的程式碼如果分散到各個函式中,勢必無法維護,也不利於程式碼複用。
所以,我們要首先把常用的SELECT、INSERT、UPDATE和DELETE操作用函式封裝起來。
由於Web框架使用了基於asyncio的aiohttp,這是基於協程的非同步模型。在協程中,不能呼叫普通的同步IO操作,因為所有使用者都是由一個執行緒服務的,協程的執行速度必須非常快,才能處理大量使用者的請求。而耗時的IO操作不能在協程中以同步的方式呼叫,否則,等待一個IO操作時,系統無法響應任何其他使用者。
這就是非同步程式設計的一個原則:一旦決定使用非同步,則系統每一層都必須是非同步,“開弓沒有回頭箭”。
幸運的是aiomysql
為MySQL資料庫提供了非同步IO的驅動。
要執行SELECT語句,我們用select
函式執行,需要傳入SQL語句和SQL引數:
async def fetchone(sql,args=(),size=None):
"""封裝select,查詢單個,返回資料為字典"""
log(sql,args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql,args)
rs = await cur.fetchone()
return rs
async def select(sql,size=None):
"""封裝select,查詢多個,返回資料為列表"""
log(sql,args)
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('rows returned: %s' % len(rs))
return rs
複製程式碼
注意要始終堅持使用帶引數的SQL,而不是自己拼接SQL字串,這樣可以防止SQL注入攻擊。
注意到yield from
將呼叫一個子協程(也就是在一個協程中呼叫另一個協程)並直接獲得子協程的返回結果。
如果傳入size
引數,就通過fetchmany()
獲取最多指定數量的記錄,否則,通過fetchall()
獲取所有記錄。
Insert,Update,Delete
要執行INSERT、UPDATE、DELETE語句,可以定義一個通用的execute()
函式,因為這3種SQL的執行都需要相同的引數,以及返回一個整數表示影響的行數:
async def execute(sql,args=()):
"""封裝insert,delete,update"""
log(sql,args)
async with _mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(sql,args)
except BaseException:
await conn.rollback()
return
else:
affected = cur.rowcount
return affected
複製程式碼
execute()
函式和select()
函式所不同的是,cursor物件不返回結果集,而是通過rowcount
返回結果數。
這三個函式定義在models資料夾下的db.py中(db.py是新建立的檔案):
完整程式碼如下:
import logging
logging.basicConfig(level=logging.INFO)
import aiomysql
import aioredis
from config.settings import DATABASES,CACHES
def log(sql,args=()):
logging.info('SQL: %s' % sql,*args)
async def create_pool(loop,1))
return _mysql_pool
async def fetchone(sql,args)
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('rows returned: %s' % len(rs))
return rs
async def execute(sql,args)
except BaseException:
await conn.rollback()
return
else:
affected = cur.rowcount
return affected
複製程式碼
把執行SQL的函式匯入到models/init.py檔案中,方便別的模組引用:
from .db import *
__all__ = ['create_pool','select','execute','fetchone']
複製程式碼
把我們建立表的sql語句儲存到models/create_table.sql檔案中:
CREATE TABLE IF NOT EXISTS `user`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '使用者ID',INDEX `phone` (`phone`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
CREATE TABLE IF NOT EXISTS `question`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '問題ID',INDEX `user_id` (`user_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
CREATE TABLE IF NOT EXISTS `choice`(
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '選擇ID',INDEX `question_id` (`question_id`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=DYNAMIC
;
複製程式碼
models目錄結構:
models/
├── __init__.py
└── db.py
複製程式碼
編寫配置檔案
之前我們說過的配置檔案,我使用py檔案當作配置檔案,conf/settings.py內容如下:
DATABASES = {
'engine': 'mysql','db': 'vote','user': 'vote','password': '123456','host': 'localhost','port': 3306,}
複製程式碼
插入模擬資料
INSERT INTO user(name,phone,email,password) VALUES('露西','16666666661','[email protected]','123456'),('南希','16666666662','[email protected]',('雪靈','16666666663','[email protected]','123456');
複製程式碼
INSERT INTO question(question_text,user_id) VALUES('最受歡迎的計算機語言?',1),('最受歡迎的水果?',2),('男人最喜歡女人什麼地方?',3);
複製程式碼
INSERT INTO choice(choice_text,question_id,votes) VALUES('python',1,3),('java',('go',1);
INSERT INTO choice(choice_text,votes) VALUES('香蕉',2,('蘋果',('草莓',votes) VALUES('漂亮臉蛋',3,('大胸',('大長腿',1);
複製程式碼
基礎檢視類
aiohttp.web
提供django風格的基礎試圖類。
你可以從 View
類中繼承,並自定義http請求的處理方法:
from aiohttp import web
from models import select
import json
import datetime
import decimal
class RewriteJsonEncoder(json.JSONEncoder):
"""重寫json類,為瞭解決datetime型別的資料無法被json格式化"""
def default(self,obj):
if isinstance(obj,datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj,datetime.date):
return obj.strftime("%Y-%m-%d")
elif isinstance(obj,decimal.Decimal):
return str(obj)
elif hasattr(obj,'isoformat'):
# 處理日期型別
return obj.isoformat()
else:
return json.JSONEncoder.default(self,obj)
def json_dumps(obj):
return json.dumps(obj,cls=RewriteJsonEncoder)
async def hello(request):
return web.Response(text='Hello Aiohttp!')
class QuestionChoices(web.View):
"""檢視一個問題的可選答案"""
async def get(self):
question_id = self.request.match_info.get('question_id')
result = await select(self.request.app['db'],'select * from choice where question_id = %s',(question_id,))
return web.json_response(data=result,dumps=json_dumps)
複製程式碼
定義路由:
from .views import hello,QuestionChoices
def setup_routes(app):
app.router.add_get('/hello',hello,name='hello')
app.router.add_route('*','/question/{question_id}/choice',QuestionChoices)
複製程式碼
開啟瀏覽器或輸入下面命令訪問:
$ curl -X GET http://127.0.0.1:9000/question/1/choice
[{"id": 1,"delete_flag": 0,"choice_text": "python","votes": 3,"question_id": 1,"create_time": "2018-04-15 19:47:16"},{"id": 2,"choice_text": "java","votes": 2,{"id": 3,"choice_text": "go","votes": 1,"create_time": "2018-04-15 19:47:16"}]j
複製程式碼
之前使用django比較多,個人喜歡使用類檢視。
裝飾器檢視
路由裝飾器有點像Flask風格:
routes = web.RouteTableDef()
@routes.get('/get')
async def handle_get(request):
...
@routes.post('/post')
async def handle_post(request):
...
app.router.add_routes(routes)
複製程式碼
首先是要建立一個 aiohttp.web.RouteTableDef
物件。
該物件是一個類列表物件,額外提供aiohttp.web.RouteTableDef.get()
,aiohttp.web.RouteTableDef.post()
這些裝飾器來註冊路由。
最後呼叫add_routes()
新增到應用的路由裡。
靜態檔案
處理靜態檔案( 圖片,JavaScripts,CSS檔案等)最好的方法是使用反向代理,像是nginx或CDN服務。
但就開發來說,aiohttp
伺服器本身可以很方便的處理靜態檔案。
只需要通過 UrlDispatcher.add_static()
註冊個新的靜態路由即可:
app.router.add_static('/static',path_to_static_folder)
複製程式碼
當訪問靜態檔案的目錄時,預設伺服器會返回 HTTP/403 Forbidden(禁止訪問)。 使用show_index
並將其設定為True
可以顯示出索引:
app.router.add_static('/static',path_to_static_folder,show_index=True)
複製程式碼
當從靜態檔案目錄訪問一個符號連結(軟連結)時,預設伺服器會響應 HTTP/404 Not Found(未找到)。使用follow_symlinks
並將其設定為True
可以讓伺服器使用符號連結:
app.router.add_static('/static',follow_symlinks=True)
複製程式碼
如果你想允許快取清除,使用append_version
並設為True
。
快取清除會對資原始檔像JavaScript 和 CSS檔案等的檔名上新增一個hash後的版本。這樣的好處是我們可以讓瀏覽器無限期快取這些檔案而不用擔心這些檔案是否釋出了新版本。
app.router.add_static('/static',append_version=True)
複製程式碼
這裡我們新增一個靜態檔案的路由
首先在配置檔案conf/settings.py中指定專案、靜態檔案、模版HTML路徑:
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 專案路徑
STATIC_DIR = os.path.join(BASE_DIR,'static') # 靜態檔案路徑
TEMPLATE_DIR = os.path.join(BASE_DIR,'template') # 模版HTML路徑
複製程式碼
接下里在application/routes.py檔案中新增一個靜態檔案路由:
def setup_static_routes(app):
app.router.add_static('/static/',path=STATIC_DIR,name='static')
複製程式碼
下載uikit的靜態檔案到static目錄下:
static
├── css
│ ├── uikit-rtl.css
│ ├── uikit-rtl.min.css
│ ├── uikit.css
│ └── uikit.min.css
└── js
├── uikit-icons.js
├── uikit-icons.min.js
├── uikit.js
└── uikit.min.js
複製程式碼
把新增靜態路由的函式新增到application/main.py檔案的init函式中:
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop)
app['db'] = mysql_pool
setup_routes(app)
setup_static_routes(app)
return app
複製程式碼
重啟伺服器訪問http://127.0.0.1:9000/static/js/bootstrap.js
$ curl -X GET http://127.0.0.1:9000/static/js/bootstrap.js
/*!
* Bootstrap v4.0.0 (https://getbootstrap.com)
* Copyright 2011-2018 The Bootstrap Authors (https://github.com/twbs/bootstrap/graphs/contributors)
* Licensed under MIT (https://github.com/twbs/bootstrap/blob/master/LICENSE)
*/
。。。。。
。。。。。
複製程式碼
可以正常訪問,靜態路由已經新增成功了。
模版
aiohttp.web
並不直接提供模板讀取,不過可以使用第三方庫 aiohttp_jinja2
,該庫是由aiohttp
作者維護的。
使用起來也很簡單。首先我們用aiohttp_jinja2.setup()
來設定下jinja2
環境。
安裝aiohttp_jinja2:
$ pip3 install aiohttp_jinja2
複製程式碼
在application/routes.py檔案中新增一個模版檔案路由:
from config.settings import STATIC_DIR,TEMPLATE_DIR
def setup_template_routes(app):
aiohttp_jinja2.setup(app,loader=jinja2.FileSystemLoader(TEMPLATE_DIR))
複製程式碼
把新增模版路由的函式新增到vote/main.py檔案的init函式中:
from application.routes import setup_routes,setup_static_routes,setup_template_routes
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop)
app['db'] = mysql_pool
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
return app
複製程式碼
增加pycharm普通專案對jinja2模版的支援,編輯.idea/vote.iml,在component標籤的同級新增如下內容:
<component name="TemplatesService">
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
<option name="TEMPLATE_FOLDERS">
<list>
<option value="$MODULE_DIR$/template" />
</list>
</option>
</component>
複製程式碼
新建一個模版HTML檔案儲存到template/index.html中,內容如下:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1,shrink-to-fit=no">
{% block title %}
<title>首頁</title>
{% endblock %}
<link rel="stylesheet" href="/static/css/uikit.min.css">
<link rel="stylesheet" href="/static/css/base.css">
<script src="/static/js/uikit.min.js"></script>
</head>
<body>
<nav class="uk-navbar-container uk-margin" uk-navbar>
<div class="uk-navbar-left">
<a class="uk-navbar-item uk-logo" href="#">Primumest</a>
<ul class="uk-navbar-nav">
<li class="uk-active"><a href="#">首頁</a></li>
<li>
<a href="#">程式語言</a>
<div class="uk-navbar-dropdown">
<ul class="uk-nav uk-navbar-dropdown-nav">
<li class="uk-active"><a href="#">python</a></li>
<li><a href="#">go</a></li>
<li><a href="#">c</a></li>
</ul>
</div>
</li>
<li><a href="#">問答</a></li>
</ul>
</div>
<div class="uk-navbar-right content">
<div class="uk-navbar-item">
<form class="uk-search uk-search-default">
<a href="" class="uk-search-icon-flip" uk-search-icon></a>
<input class="uk-input uk-form-width-smal" type="search" placeholder="Search">
</form>
</div>
<ul class="uk-navbar-nav">
<li class="uk-active"><a href="#">登入</a></li>
<li><a href="{{ url('register') }}">註冊</a></li>
</ul>
</div>
</nav>
{% block content %}
{% endblock %}
</body>
</html>
複製程式碼
新建註冊頁面儲存到template/register.html中,內容如下:
{% extends "index.html" %}
{% block title %}
<title>註冊</title>
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box">
<fieldset style="width: 30%">
<legend>註冊賬號</legend>
<div class="uk-form-row">
<span class="uk-form-label">暱稱 </span>
<input type="text" name="name" placeholder="請輸入你的名字" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">郵箱 </span>
<input type="text" name="email" placeholder="請輸入你的郵箱" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">手機 </span>
<input type="text" name="phone" placeholder="請輸入你的手機號" class="uk-form-width-medium uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">密碼 </span>
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-form-width-medium uk-form-small">
</div>
<button type="submit" class="uk-button-primary">提交</button>
</fieldset>
</form>
</div>
{% endblock %}
複製程式碼
頁面用到了jinja2模版的語法。
建立檢視函式用來訪問這個模版檔案:
@aiohttp_jinja2.template('index.html')
async def index(request):
return
@aiohttp_jinja2.template('register.html')
async def register(request):
return
複製程式碼
建立與之對應的路由:
def setup_routes(app):
app.router.add_get('/hello',name='hello')
app.router.add_get('/',index,name='index')
app.router.add_get('/register',register,name='register')
app.router.add_route('*',QuestionChoices,name='QuestionChoices')
複製程式碼
重啟伺服器,瀏覽器訪問http://127.0.0.1:9000
瀏覽器訪問http://127.0.0.1:9000/register
除錯工具箱
開發aiohttp.web
應用專案時,aiohttp_debugtoolbar
是非常好用的一個除錯工具。
可使用pip進行安裝:
$ pip3 install aiohttp_debugtoolbar
複製程式碼
之後將aiohttp_debugtoolba
r中介軟體新增到aiohttp.web.Applicaiton
中並呼叫aiohttp_debugtoolbar.setup()
來部署:
import aiohttp_debugtoolbar
from aiohttp_debugtoolbar import toolbar_middleware_factory
app = web.Application(middlewares=[toolbar_middleware_factory])
aiohttp_debugtoolbar.setup(app)
複製程式碼
這裡是我們的配置:
import asyncio
import aiohttp_debugtoolbar
from aiohttp import web
from application.routes import setup_routes,setup_template_routes
from models import create_pool
from aiohttp_debugtoolbar import toolbar_middleware_factory
async def init(loop):
mysql_pool = await create_pool(loop)
app = web.Application(loop=loop,middlewares=[toolbar_middleware_factory])
app['db'] = mysql_pool
aiohttp_debugtoolbar.setup(app)
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
return app
複製程式碼
瀏覽器輸入地址http://127.0.0.1:9000/_debugtoolbar
可以看到如下頁面:
開發工具
aiohttp-devtools
提供幾個簡化開發的小工具。
可以使用pip安裝:
$ pip3 install aiohttp-devtools
* ``runserver`` 提供自動過載,實時過載,靜態檔案服務和aiohttp_debugtoolbar_integration。
* ``start`` 是一個幫助做繁雜且必須的建立'aiohttp.web'應用的命令。
複製程式碼
這是我們的專案啟動的例子:
$ adev runserver -v main.py --app-factory init -p 9000 --debug-toolbar --host localhost
複製程式碼
這個adev著實難用,我們定義的init函式是個協程函式,但是它命令--app-factory要求必須是個普通函式,並且返回一個aiohttp.web.Application。由於我們要使用資料庫連線池,必須使用await協程語法。所以我放棄使用這個東西了。
建立和執行本地應用的檔案和指南請看aiohttp-devtools
。
下面準備編寫註冊、登入的邏輯了,這裡先使用session會話機制。以後使用oauth2.0的token認證機制。
處理session會話
你經常想要一個可以通過請求儲存使用者資料的倉庫。一般簡稱為會話。
aiohttp.web
沒有內建會話,不過你可以使用第三方庫aiohttp_session
來提供會話支援。
官網例子:
import asyncio
import aioredis
import time
from aiohttp import web
from aiohttp_session import setup,get_session
from aiohttp_session.redis_storage import RedisStorage
async def handler(request):
session = await get_session(request)
last_visit = session['last_visit'] if 'last_visit' in session else None
session['last_visit'] = time.time()
text = 'Last visited: {}'.format(last_visit)
return web.Response(text=text)
async def make_redis_pool():
redis_address = ('127.0.0.1','6379')
return await aioredis.create_redis_pool(redis_address,timeout=1)
def make_app():
loop = asyncio.get_event_loop()
redis_pool = loop.run_until_complete(make_redis_pool())
storage = RedisStorage(redis_pool)
async def dispose_redis_pool(app):
redis_pool.close()
await redis_pool.wait_closed()
app = web.Application()
setup(app,storage)
app.on_cleanup.append(dispose_redis_pool)
app.router.add_get('/',handler)
return app
web.run_app(make_app())
複製程式碼
安裝aiohttp_session
:
$ pip3 install aiohttp_session
複製程式碼
session儲存使用redis,這裡使用aioredis連線redis。
安裝aioredis:
$ pip3 install aioredis
複製程式碼
建立redis全域性連線池與redis命令簡單封裝,編輯models/db.py:
import aioredis
from config.settings import DATABASES,CACHES
async def create_redis_pool(loop):
"""定義redis全域性連線池"""
logging.info('create redis connection pool...')
global _reids_pool
_reids_pool = await aioredis.create_pool(address=CACHES['address'],db=CACHES['db'],password=CACHES['password'],minsize=CACHES['minsize'],maxsize=CACHES['maxsize'],loop=loop)
return _reids_pool
async def cache_set(*args,**kwargs):
"""redis set 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
await redis.set(*args,**kwargs)
async def cache_get(*args,**kwargs):
"""redis get 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
return await redis.get(*args,**kwargs)
async def cache_del(*args,**kwargs):
"""redis del 命令封裝"""
with await aioredis.commands.Redis(_reids_pool) as redis:
return await redis.delete(*args,**kwargs)
複製程式碼
CACHES
在我們config/settings.py裡面定義:
CACHES = {
'engine': 'redis','address': ('localhost',6379),'password': None,'db': None,'minsize': 1,'maxsize': 10
}
複製程式碼
把執行redis命令的函式匯入到models/init.py檔案中,方便別的模組引用:
from .db import *
__all__ = ['create_pool','fetchone','create_redis_pool','cache_set','cache_get','cache_del']
複製程式碼
註冊頁面:
{% extends "index.html" %}
{% block title %}
<title>註冊</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box" method="post" action="{{ url('Register') }}">
<fieldset style="width: 25%; padding: 1rem 0 1rem 5rem">
<legend style="text-align: center">註冊賬號</legend>
<div class="uk-form-row">
<span class="uk-form-label">暱稱 </span>
<input type="text" name="name" placeholder="請輸入你的名字" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">郵箱 </span>
<input type="text" name="email" placeholder="請輸入你的郵箱" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">手機 </span>
<input type="text" name="phone" placeholder="請輸入你的手機號" class="uk-width-1-2 uk-form-small">
</div>
<div class="uk-form-row">
<span class="uk-form-label">密碼 </span>
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-width-1-2 uk-form-small">
</div>
<button type="submit" class="uk-button-primary">提交</button>
</fieldset>
</form>
</div>
{% endblock %}
複製程式碼
註冊檢視函式:
class Register(web.View):
"""a view handler for register page"""
@aiohttp_jinja2.template('register.html')
async def get(self):
return
async def post(self):
data = await self.request.post()
user = await fetchone('select id from user where email = %s or phone = %s',(data.get('email'),data.get('phone')))
# print(await self.request.multipart())
if user:
msg = {'error_code': 20001,'error_msg': 'The email or phone has been registered'}
else:
params = (data.get('name'),data.get('email'),data.get('phone'),data.get('password'))
result = await fetchone('INSERT INTO user(name,password) VALUES(%s,%s,%s)',params)
if result:
msg = {'error_code': 0,'error_msg': 'ok'}
else:
msg = {'error_code': 20002,'error_msg': 'Please try again if registration fails'}
# return web.json_response(data=msg,dumps=json_dumps)
return web.json_response(data=msg,dumps=json_dumps)
複製程式碼
登入頁面:
{% extends "index.html" %}
{% block title %}
<title>登入</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<form class="uk-form register-box uk-text-center" method="post" action="{{ url('Login') }}" style="margin-top: 2rem;">
<div class="uk-form-row">
<input type="text" name="account" placeholder="請輸入郵箱或手機號" class="uk-width-1-5 uk-form-small">
</div>
<div class="uk-form-row">
<input type="password" name="password" placeholder="請輸入你的密碼" class="uk-width-1-5 uk-form-small">
</div>
<button type="submit" class="uk-width-1-5 uk-button-primary uk-button-small">提交</button>
{% if msg %}
<p class="uk-text-danger">{{ msg.error_msg }}</p>
{% endif %}
</form>
</div>
{% endblock %}
{% block bottom_js %}
{% endblock %}
複製程式碼
登入檢視函式:
class Login(web.View):
"""a view handler for login page"""
async def get(self):
return aiohttp_jinja2.render_template('login.html',self.request,locals())
async def post(self):
data = await self.request.post()
account = data.get('account')
password = data.get('password')
columns = 'id,name,password'
if len(account) == 11 and re.match(r'^1[35678]\d{9}',account):
user = await fetchone('select {} from user where phone = %s'.format(columns),(account,))
elif re.match(r'^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$',account):
user = await fetchone('select {} from user where email = %s'.format(columns),))
else:
msg = {'error_code': 20003,'error_msg': 'User does not exists'}
return aiohttp_jinja2.render_template('login.html',locals())
if password != user.get('password'):
msg = {'error_code': 20004,'error_msg': 'Password mismatch'}
return aiohttp_jinja2.render_template('login.html',locals())
session = await get_session(self.request)
session['uid'] = user.get('id')
# sessionid = session.identity
return web.Response(status=302,headers={'location': '/'})
複製程式碼
給首頁檢視函式增加個驗證登入到裝飾器:
from aiohttp_session import get_session
from functools import wraps
def login_required(func): # 使用者登入狀態校驗
"""This function applies only to class views."""
@wraps(func)
async def inner(cls,*args,**kwargs):
session = await get_session(cls.request)
uid = session.get("uid")
if uid:
user = await fetchone('select id,phone from user where id = %s',(uid,))
cls.request.app.userdata = user
return await func(cls,**kwargs)
else:
return web.Response(status=302,headers={'location': '/login'})
return inner
class Index(web.View):
"""a view handler for home page"""
@login_required
async def get(self):
# response.headers['Content-Language'] = 'utf-8'
return aiohttp_jinja2.render_template('index.html',locals())
複製程式碼
這裡我把檢視處理函式全部改為類檢視方式編寫了。
增加路由:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'junxi'
import aiohttp_jinja2
import jinja2
import uuid
from application.views import Hello,Index,Register,Login,Questions,hash_sha256
from config.settings import STATIC_DIR,TEMPLATE_DIR
from aiohttp_session import setup
from aiohttp_session.redis_storage import RedisStorage
def setup_session(app,redis_pool):
storage = RedisStorage(redis_pool=redis_pool,cookie_name='sessionid',key_factory=lambda: hash_sha256(uuid.uuid4().hex))
setup(app,storage)
def setup_routes(app):
app.router.add_view('/hello',Hello,name='Hello')
app.router.add_view('',name='Index')
app.router.add_view('/register',name='Register')
app.router.add_view('/login',name='Login')
app.router.add_view('/questions/{question_id}/choice',name='QuestionChoices'
複製程式碼
main.py
增加session處理:
async def init(loop):
mysql_pool = await create_pool(loop)
redis_pool = await create_redis_pool(loop)
# app = web.Application(loop=loop,middlewares=[toolbar_middleware_factory])
# aiohttp_debugtoolbar.setup(app)
async def dispose_mysql_pool():
mysql_pool.close()
await mysql_pool.wait_closed()
async def dispose_redis_pool():
redis_pool.close()
await redis_pool.wait_closed()
async def dispose_pool(app):
await dispose_mysql_pool()
await dispose_redis_pool()
app = web.Application(loop=loop)
setup_session(app,redis_pool)
setup_routes(app)
setup_static_routes(app)
setup_template_routes(app)
app.on_cleanup.append(dispose_pool)
return app
複製程式碼
重新啟動伺服器,輸入地址http://127.0.0.1:9000/
, 會跳轉到登入頁面:
輸入賬號密碼登入:
跳轉到首頁,可以看到右上角顯示暱稱,已經登入成功了。
增加問答頁面:
{% extends "index.html" %}
{% block title %}
<title>問答</title>
{% endblock %}
{% block head_js %}
{% endblock %}
{% block content %}
<div class="uk-container content">
<div class="uk-child-width-1-2@s" uk-grid>
{% for question in questions %}
<div>
<div class="uk-dark uk-background-muted uk-padding">
<h3 class="uk-text-danger">{{ question.question_text }}</h3>
{% for i in question.question_choice|choice_split %}
<p><label><input class="uk-radio" type="radio" name="radio2" value="{{ i.0 }}"> {{ i.1 }}</label></p>
{% endfor %}
<button class="uk-button-primary uk-button-small">提交</button>
</div>
</div>
{% endfor %}
</div>
</div>
{% endblock %}
{% block bottom_js %}
{% endblock %}
複製程式碼
增加問答檢視函式:
class Questions(web.View):
"""a view handler for look at all questions"""
@login_required
async def get(self):
questions = await select('select q.id as qid,q.question_text,(select group_concat(concat_ws("|",c.id,c.choice_text)) from choice c where c.question_id = q.id) as question_choice from question q;')
return aiohttp_jinja2.render_template('questions.html',locals())
複製程式碼
增加路由以及我們自定義的jinja2模版上下文處理函式:
import aiohttp_jinja2
import jinja2
import uuid
from application.views import Hello,name='QuestionChoices')
app.router.add_view('/questions',name='Questions')
def setup_static_routes(app):
app.router.add_static('/static/',name='static')
def setup_template_routes(app):
aiohttp_jinja2.setup(app,filters={'choice_split': choice_split},loader=jinja2.FileSystemLoader(TEMPLATE_DIR))
def choice_split(choices):
for i in choices.split(','):
single = i.split('|')
yield single
複製程式碼
重啟服務後檢視問答頁面http://127.0.0.1:9000/questions
專案展示
這是完整程式碼。
supervisor部署專案
安裝supervisor:
mkdir ~/supervisor
cd ~/supervisor/
wget https://files.pythonhosted.org/packages/44/60/698e54b4a4a9b956b2d709b4b7b676119c833d811d53ee2500f1b5e96dc3/supervisor-3.3.4.tar.gz
tar zxf supervisor-3.3.4.tar.gz
cd supervisor-3.3.4
sudo python setup.py install
supervisord -v
複製程式碼
生成配置檔案:
$ echo_supervisord_conf > supervisord.conf
複製程式碼
啟動:
$ supervisord -c supervisord.conf
複製程式碼
檢視 supervisord 是否在執行:
$ ps aux|grep supervisord
junxi 5064 0.0 0.0 4267768 900 s000 S+ 10:37上午 0:00.00 grep --color supervisord
junxi 5059 0.0 0.0 4344312 2196 ?? Ss 10:37上午 0:00.01 /usr/bin/python /usr/local/bin/supervisord -c supervisord.conf
複製程式碼
開啟配置檔案:
vim supervisord.conf
複製程式碼
建立aio目錄:
mkdir aio
複製程式碼
在配置檔案底部,配置include
[include]
files = aio/*.conf
複製程式碼
其他引數配置:
# grep -Ev '^;|^$' supervisord.conf
[unix_http_server]
file=/var/log/supervisor/supervisor.sock ; the path to the socket file
[inet_http_server] ; inet (TCP) server disabled by default
port=127.0.0.1:9001 ; ip_address:port specifier,*:port for all iface
username=user ; default is no username (open server)
password=123 ; default is no password (open server)
[supervisord]
logfile=/var/log/supervisor/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10 ; # of main logfile backups; 0 means none,default 10
loglevel=info ; log level; default info; others: debug,warn,trace
pidfile=/var/log/supervisor/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false ; start in foreground if true; default false
minfds=1024 ; min. avail startup file descriptors; default 1024
minprocs=200 ; min. avail process descriptors;default 200
childlogdir=/var/log/supervisor ; 'AUTO' child log dir,default $TEMP
[include]
files = /Users/junxi/supervisor/aio/*.conf
複製程式碼
在aio資料夾下新建vote.conf檔案用於啟動我們的vote專案,內容如下:
# vim aio/vote.conf
[program:vote]
numprocs = 4
numprocs_start = 1
process_name = vote_910%(process_num)s
command=python3 /Users/junxi/program/vote/main.py --port=910%(process_num)s
directory=/Users/junxi/program/vote
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/vote/access.log
loglevel=info
複製程式碼
建立存放日誌的資料夾:
$ sudo mkdir /var/log/supervisor
$ sudo chown -R junxi:admin /var/log/supervisor
$ sudo mkdir /var/log/vote/
$ sudo chown -R junxi:admin /var/log/vote/
複製程式碼
重啟supervisor:
$ kill -Hup `ps -ef|grep supervisord|awk 'NR==1{print $2}'`
複製程式碼
或者手動找到pid重啟。
使用客戶端supervisorctl管理程式的啟動
連線到服務端:
$ supervisorctl -c supervisord.conf
複製程式碼
輸入預設的賬戶user
,密碼123
進入命令列。
檢視狀態:
supervisor> help
default commands (type help <topic>):
=====================================
add exit open reload restart start tail
avail fg pid remove shutdown status update
clear maintail quit reread signal stop version
supervisor> status
vote:vote_9101 STOPPED Apr 17 11:00 PM
vote:vote_9102 STOPPED Apr 17 11:00 PM
vote:vote_9103 STOPPED Apr 17 11:00 PM
vote:vote_9104
複製程式碼
啟動vote:
supervisor> start all
vote:vote_9101: started
vote:vote_9102: started
vote:vote_9103: started
vote:vote_9104: started
複製程式碼
瀏覽器輸入 http://127.0.0.1:9001/
開啟web頁面檢視supervisor狀態,就是我們配置檔案中的inet_http_server。
瀏覽器輸入4個埠(分別為9101、9102、9103、9104)分別進行訪問測試:
然後再使用nginx做個負載均衡:
proxy_next_upstream error;
upstream votes {
server 127.0.0.1:9101;
server 127.0.0.1:9102;
server 127.0.0.1:9103;
server 127.0.0.1:9104;
}
server {
listen 8008;
server_name localhost;
access_log /var/log/nginx/vote/access.log;
error_log /var/log/nginx/vote/error.log;
proxy_read_timeout 200;
location /static/ {
alias /Users/junxi/program/vote/static/;
}
location / {
proxy_pass_header Server;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Scheme $scheme;
proxy_pass http://votes;
}
}
複製程式碼
別忘了設定Nginx的worker_rlimit_nofile、worker_connections、worker_processes。
訪問http://localhost:8008/hello
Nice。
先寫到這裡了。