1. 程式人生 > 其它 >Sanic十六:Sanic + 非同步orm之SQLAlchemy

Sanic十六:Sanic + 非同步orm之SQLAlchemy

Sanic是非同步庫,想要發揮其強大的效能,當需要使用第三方庫的時候,就需要使用非同步的庫,在python中,非同步orm較為常見的就兩個可,一個SQLAlchemy,一個Tortoise-ORM

SQLAlchemy 在1.4版本之後,已經支援非同步了,既然要用非同步,那同步庫的PyMYSQL肯就就不能滿足了,所以需要用非同步庫aiomysql

SQLAlchemy官網:https://www.sqlalchemy.org/

SQLAlchemy中文網站:https://www.osgeo.cn/sqlalchemy/index.html

aiomysql官網:https://aiomysql.readthedocs.io/en/latest/

安裝SQLAlchemy: pip installSQLAlchemy

安裝aiomysql: pip installaiomysql

使用示例

先確保有資料庫:庫名隨便取,比如test

建立模型

由於沒有sqlalchemy1.4以後的教程太少,沒有找到用aiomysql作為驅動的對映教程,所以這裡將物件對映到資料庫的操作還是用mysql,系統執行中用aiomysql

orm與app繫結,這裡就用非同步的aiomysql了

在中介軟體中定義獲取資料庫session和釋放資源

路由

插入資料

查詢

models

from sqlalchemy import Column, String, Integer
from sqlalchemy.orm import declarative_base

Base = declarative_base()


class BaseModel(Base):
__abstract__ = True
id = Column(Integer, primary_key=True, comment='id,主鍵')


class User(BaseModel):
""" 使用者表 """
__tablename__ = "user"
name = Column(String(64), comment='名字')
age = Column(Integer, comment='年齡')

def to_dict(self):
return {"id": self.id, "name": self.name, "age": self.age}

模型對映到資料庫

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from models import Base

# 匯入相應的模組
engine = create_engine("mysql+pymysql://root:123456@localhost/test")

# 建立session物件
session = sessionmaker(engine)()

# 建立表,執行所有BaseModel類的子類
Base.metadata.create_all(engine)

# 提交,必須
session.commit()

業務邏輯

from contextvars import ContextVar

from sanic import Sanic, response
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

from models import User

app = Sanic("SanicAndSqlalchemy")

# 建立非同步資料庫引擎
bind = create_async_engine("mysql+aiomysql://root:123456@localhost/test", echo=True)

_base_model_session_ctx = ContextVar("session")


@app.middleware("request")
async def inject_session(request):
"""
請求中介軟體
建立一個可用的 AsyncSession 物件並且將其繫結至 request.ctx 中,
而 _base_model_session_ctx 也會在這是被賦予可用的值,
如果需要在其他地方使用 session 物件(而非從 request.ctx 中取值),該全域性變數或許能幫助您(它是執行緒安全的)。
"""
request.ctx.session = sessionmaker(bind, AsyncSession, expire_on_commit=False)()
request.ctx.session_ctx_token = _base_model_session_ctx.set(request.ctx.session)


@app.middleware("response")
async def close_session(request, response):
""" 響應中介軟體,將建立的 AsyncSession 關閉,並重置 _base_model_session_ctx 的值,進而釋放資源 """
if hasattr(request.ctx, "session_ctx_token"):
_base_model_session_ctx.reset(request.ctx.session_ctx_token)
await request.ctx.session.close()


@app.post("/user")
async def create_user(request):
session = request.ctx.session
async with session.begin():
user = User(**request.json)
session.add(user)
return response.json(user.to_dict())


@app.get("/user/<user_id:int>")
async def get_user(request, user_id):
session = request.ctx.session
async with session.begin():
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar()
return response.json({'code': 200, 'message': '查詢成功', 'data': user.to_dict() if user else {}})


if __name__ == '__main__':
import uvicorn

uvicorn.run('main:app', host='0.0.0.0', port=8000, debug=True)
討論群:249728408