1. 程式人生 > 其它 >使用SQLAlchemy進行ORM操作

使用SQLAlchemy進行ORM操作

一、參考文件

http://www.pythondoc.com/flask-sqlalchemy/quickstart.html (官方文件)

https://www.jianshu.com/p/f454a1aa760c

二、ORM解釋

Object (code) - Relational (Database) - Mapping

資料庫的表(table) --> 類(class)

記錄(record,行資料)--> 物件(object)

欄位(field)--> 物件的屬性(attribute)

三、封裝工作

1.Database → Class

使用以下指令生成models檔案

sqlacodegen mysql+pymysql://使用者名稱:密碼@主機:埠號/資料庫名 > 資料庫物件存放路徑

2.資料庫連線

import pymysql

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

class DBSession(object):
    def __init__(self, db_name=database):
        engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user, passwd, host, port, db_name),
                               echo
=False) Session = sessionmaker() Session.configure(bind=engine) self.session = Session(autocommit=True) # 可以這樣用 db = DBSession() db.session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: 1, AppNotTest.updatedBy:
'tester'}, synchronize_session=False) db.session.flush() # 資料提交 db.session.close() # 資料庫連線關閉

3.裝飾器組裝

def db_fixture(db_name='ibupltplatformdb'):
    class DBFixture():
        def __init__(self):
            self.db_name = db_name

        def __db_conn(self):
            self.db = DBSession(self.db_name)
            self.db_session = self.db.session
            return self.db_session

        def __db_finalizer(self):
            self.db_session.flush()
            self.db_session.close()

        def __call__(self, func):
            def wrapper(*args, **kwargs):
                session = self.__db_conn()
                try:
                    res = func(session=session, *args, **kwargs)
                    return res
                except Exception as ex:
                    logging.error('[DB ERROR] {}'.format(ex))
                    session.rollback()
                finally:
                    self.__db_finalizer()

            return wrapper

    return DBFixture()

四、使用

1.查

@db_fixture(db_name='platformdb')
def query_app_not_test_by_id(session, app_id):
    """
    1. 查:根據app_id, 等同於SELECT * FROM app_not_test WHERE app_id= ..;
    :param session: 由裝飾器傳,呼叫該方法時無需傳此引數
    :param app_id: 呼叫該方法時傳此引數
    :return:
    """
    record = session.query(AppNotTest).filter(AppNotTest.appId == app_id).scalar()        #scalar():返回結果小於等於1
    return record


@db_fixture()
def query_app_not_test_by_app_list(session, app_list):
    # 2. 批量查詢
    records = session.query(AppNotTest).filter(AppNotTest.appId.in_(app_list)).all()
    return records


@db_fixture()
def query_app_not_test_with_group(session, app_id):
    # 3. 聯表查詢
    record = session.query(AppNotTest.appId.label('app'), Appgroup.groupname.label('group')).\
        filter(AppNotTest.appId == app_id, AppNotTest.appId == Appgroup.appid).all()
    return record


@db_fixture()
def query_distinct_group_name(session):
    # 4. group by使用
    records = session.query(Appgroup.groupname, func.count(func.distinct(Appgroup.groupname))).group_by(Appgroup.groupname).all()
    return records


@db_fixture()
def query_app_not_test_by_pagination(session, limit, offset):
    # 5. 分頁查詢
    records = session.query(AppNotTest).order_by(AppNotTest.datachange_lasttime.desc()).offset(offset).limit(limit).all()
    return records


@db_fixture()
def query_by_sql(session, app_id):
    # 6. sql查詢,結果是元組而非object
    sql = 'SELECT * FROM app_not_test WHERE appId = :app_id'
    records = session.execute(text(sql), {'app_id': app_id}).fetchall()
    return records  

2. 增

@db_fixture()
def add_app_not_test(session, app_id, status):
    record = app_not_test_assembler(app_id, status)
    session.add(record)


def app_not_test_assembler(app_id, status):
    app_not_test = AppNotTest()
    app_not_test.appId = app_id
    app_not_test.status = status
    app_not_test.createdBy = 'tester'
    app_not_test.updatedBy = 'tester'

    return app_not_test

3.改

@db_fixture()
def update_app_not_test_status(session, app_id, status):
    session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: status},
                                                                        synchronize_session=False)

4.刪

@db_fixture()
def delete_app_not_test_by_app_id(session, app_id):
    session.query(AppNotTest).filter(AppNotTest.appId == app_id).delete(synchronize_session='fetch')

五、其它

1、在使用create_engine建立引擎時,如果預設不指定連線池設定的話,一般情況下,SQLAlchemy會使用一個QueuePool繫結在新建立的引擎上。並附上合適的連線池引數。

2、在這種情況下,當你使用了session後就算顯式地呼叫session.close(),也不能把連線關閉。連線會由QueuePool連線池進行管理並複用。

3、如果想禁用SQLAlchemy提供的資料庫連線池,只需要在呼叫create_engine是指定連線池為NullPool,SQLAlchemy就會在執行session.close()後立刻斷開資料庫連線。當然,如果session物件被析構但是沒有被呼叫session.close(),則資料庫連線不會被斷開,直到程式終止。

from sqlalchemy.pool import NullPool

#連線資料庫
def getEngine():
    engine = create_engine('mysql://root:[email protected]/baa?charset=utf8mb4', poolclass=NullPool, echo=False)
    #print(engine)
    return engine

def getSession():
    engine = getEngine()
    BaseMode.metadata.create_all(engine)## 資料庫生成表
    # Session = sessionmaker(bind=engine)
    # session = Session()

    DBSession = sessionmaker(bind=engine)
    session = DBSession()

    return session