Flask-SQLAlchemy 二次開發,使其支援讀寫分離
阿新 • • 發佈:2022-03-07
Flask-SQLAlchemy 二次開發,使其支援讀寫分離
sqlalchemy 並沒有像 django-orm 一樣內建完善的讀寫分離方案, 但是提供了可以自定義的介面: 官方文件, 我們可以藉此對 flask-sqlalchemy 進行二次開發, 實現讀寫分離
from flask import Flask from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state from sqlalchemy import orm app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:[email protected]:3306/recall" app.config["SQLALCHEMY_BINDS"] = { "master": "mysql+pymysql://root:[email protected]:3306/recall", # 主庫 "slave": "mysql+pymysql://root:[email protected]:3307/recall" # 從庫 } app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False if ENV == "product" else True app.config["SQLALCHEMY_ECHO"] = False if ENV == "product" else True class RoutingSession(SignallingSession): """The signalling session is the default session that Flask-SQLAlchemy uses. It extends the default session system with bind selection and modification tracking. If you want to use a different session you can override the :meth:`SQLAlchemy.create_session` function. .. versionadded:: 2.0 .. versionadded:: 2.1 The `binds` option was added, which allows a session to be joined to an external transaction. 繼承SignallingSession, 重寫`get_bind` 使其支援讀寫分離 """ def __init__(self, *args, **kwargs): super(RoutingSession, self).__init__(*args, **kwargs) def get_bind(self, mapper=None, clause=None): """Return the engine or connection for a given model or table, using the ``__bind_key__`` if it is set. """ # mapper is None if someone tries to just get a connection state = get_state(self.app) if mapper is not None: try: # SA >= 1.3 persist_selectable = mapper.persist_selectable except AttributeError: # SA < 1.3 persist_selectable = mapper.mapped_table info = getattr(persist_selectable, 'info', {}) bind_key = info.get('bind_key') if bind_key is not None: return state.db.get_engine(self.app, bind=bind_key) # 讀寫分離 from sqlalchemy.sql.dml import UpdateBase if self._flushing or isinstance(clause, UpdateBase): print("user master DB") return state.db.get_engine(self.app, bind="master") else: print("user slave DB") return state.db.get_engine(self.app, bind="slave") class RoutingSQLAlchemy(SQLAlchemy): """ 重寫 `create_session` 使其使用`RoutingSession` """ def create_session(self, options): """Create the session factory used by :meth:`create_scoped_session`. The factory **must** return an object that SQLAlchemy recognizes as a session, or registering session events may raise an exception. Valid factories include a :class:`~sqlalchemy.orm.session.Session` class or a :class:`~sqlalchemy.orm.session.sessionmaker`. The default implementation creates a ``sessionmaker`` for :class:`RoutingSession`. :param options: dict of keyword arguments passed to session class """ return orm.sessionmaker(class_=RoutingSession, db=self, **options) db = RoutingSQLAlchemy(app)