1. 程式人生 > 其它 >Flask-SQLAlchemy 二次開發,使其支援讀寫分離

Flask-SQLAlchemy 二次開發,使其支援讀寫分離

Flask-SQLAlchemy 二次開發,使其支援讀寫分離

​ sqlalchemy 並沒有像 django-orm 一樣內建完善的讀寫分離方案, 但是提供了可以自定義的介面: 官方文件, 我們可以藉此對 flask-sqlalchemy 進行二次開發, 實現讀寫分離

from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm

app = Flask(__name__)

app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:[email protected]:3306/recall"
app.config["SQLALCHEMY_BINDS"] = {
    "master": "mysql+pymysql://root:[email protected]:3306/recall",  # 主庫
    "slave": "mysql+pymysql://root:[email protected]:3307/recall"    # 從庫
}

app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False if ENV == "product" else True
app.config["SQLALCHEMY_ECHO"] = False if ENV == "product" else True


class RoutingSession(SignallingSession):
    """The signalling session is the default session that Flask-SQLAlchemy
    uses.  It extends the default session system with bind selection and
    modification tracking.

    If you want to use a different session you can override the
    :meth:`SQLAlchemy.create_session` function.

    .. versionadded:: 2.0

    .. versionadded:: 2.1
        The `binds` option was added, which allows a session to be joined
        to an external transaction.

    繼承SignallingSession, 重寫`get_bind` 使其支援讀寫分離
    """

    def __init__(self, *args, **kwargs):
        super(RoutingSession, self).__init__(*args, **kwargs)

    def get_bind(self, mapper=None, clause=None):
        """Return the engine or connection for a given model or
        table, using the ``__bind_key__`` if it is set.
        """
        # mapper is None if someone tries to just get a connection
        state = get_state(self.app)
        if mapper is not None:
            try:
                # SA >= 1.3
                persist_selectable = mapper.persist_selectable
            except AttributeError:
                # SA < 1.3
                persist_selectable = mapper.mapped_table

            info = getattr(persist_selectable, 'info', {})
            bind_key = info.get('bind_key')
            if bind_key is not None:
                return state.db.get_engine(self.app, bind=bind_key)

        # 讀寫分離
        from sqlalchemy.sql.dml import UpdateBase
        if self._flushing or isinstance(clause, UpdateBase):
            print("user master DB")
            return state.db.get_engine(self.app, bind="master")
        else:
            print("user slave DB")
            return state.db.get_engine(self.app, bind="slave")


class RoutingSQLAlchemy(SQLAlchemy):
    """
    重寫 `create_session` 使其使用`RoutingSession`
    """
    def create_session(self, options):
        """Create the session factory used by :meth:`create_scoped_session`.

        The factory **must** return an object that SQLAlchemy recognizes as a session,
        or registering session events may raise an exception.

        Valid factories include a :class:`~sqlalchemy.orm.session.Session`
        class or a :class:`~sqlalchemy.orm.session.sessionmaker`.

        The default implementation creates a ``sessionmaker`` for :class:`RoutingSession`.

        :param options: dict of keyword arguments passed to session class
        """

        return orm.sessionmaker(class_=RoutingSession, db=self, **options)


db = RoutingSQLAlchemy(app)