使用SQLAlchemy進行ORM操作
阿新 • • 發佈:2022-01-29
一、參考文件
http://www.pythondoc.com/flask-sqlalchemy/quickstart.html (官方文件)
https://www.jianshu.com/p/f454a1aa760c
二、ORM解釋
Object (code) - Relational (Database) - Mapping
資料庫的表(table) --> 類(class)
記錄(record,行資料)--> 物件(object)
欄位(field)--> 物件的屬性(attribute)
三、封裝工作
1.Database → Class
使用以下指令生成models檔案
sqlacodegen mysql+pymysql://使用者名稱:密碼@主機:埠號/資料庫名 > 資料庫物件存放路徑
2.資料庫連線
import pymysql from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker class DBSession(object): def __init__(self, db_name=database): engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user, passwd, host, port, db_name), echo=False) Session = sessionmaker() Session.configure(bind=engine) self.session = Session(autocommit=True) # 可以這樣用 db = DBSession() db.session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: 1, AppNotTest.updatedBy:'tester'}, synchronize_session=False) db.session.flush() # 資料提交 db.session.close() # 資料庫連線關閉
3.裝飾器組裝
def db_fixture(db_name='ibupltplatformdb'): class DBFixture(): def __init__(self): self.db_name = db_name def __db_conn(self): self.db = DBSession(self.db_name) self.db_session = self.db.session return self.db_session def __db_finalizer(self): self.db_session.flush() self.db_session.close() def __call__(self, func): def wrapper(*args, **kwargs): session = self.__db_conn() try: res = func(session=session, *args, **kwargs) return res except Exception as ex: logging.error('[DB ERROR] {}'.format(ex)) session.rollback() finally: self.__db_finalizer() return wrapper return DBFixture()
四、使用
1.查
@db_fixture(db_name='platformdb') def query_app_not_test_by_id(session, app_id): """ 1. 查:根據app_id, 等同於SELECT * FROM app_not_test WHERE app_id= ..; :param session: 由裝飾器傳,呼叫該方法時無需傳此引數 :param app_id: 呼叫該方法時傳此引數 :return: """ record = session.query(AppNotTest).filter(AppNotTest.appId == app_id).scalar() #scalar():返回結果小於等於1 return record @db_fixture() def query_app_not_test_by_app_list(session, app_list): # 2. 批量查詢 records = session.query(AppNotTest).filter(AppNotTest.appId.in_(app_list)).all() return records @db_fixture() def query_app_not_test_with_group(session, app_id): # 3. 聯表查詢 record = session.query(AppNotTest.appId.label('app'), Appgroup.groupname.label('group')).\ filter(AppNotTest.appId == app_id, AppNotTest.appId == Appgroup.appid).all() return record @db_fixture() def query_distinct_group_name(session): # 4. group by使用 records = session.query(Appgroup.groupname, func.count(func.distinct(Appgroup.groupname))).group_by(Appgroup.groupname).all() return records @db_fixture() def query_app_not_test_by_pagination(session, limit, offset): # 5. 分頁查詢 records = session.query(AppNotTest).order_by(AppNotTest.datachange_lasttime.desc()).offset(offset).limit(limit).all() return records @db_fixture() def query_by_sql(session, app_id): # 6. sql查詢,結果是元組而非object sql = 'SELECT * FROM app_not_test WHERE appId = :app_id' records = session.execute(text(sql), {'app_id': app_id}).fetchall() return records
2. 增
@db_fixture() def add_app_not_test(session, app_id, status): record = app_not_test_assembler(app_id, status) session.add(record) def app_not_test_assembler(app_id, status): app_not_test = AppNotTest() app_not_test.appId = app_id app_not_test.status = status app_not_test.createdBy = 'tester' app_not_test.updatedBy = 'tester' return app_not_test
3.改
@db_fixture() def update_app_not_test_status(session, app_id, status): session.query(AppNotTest).filter(AppNotTest.appId == app_id).update({AppNotTest.status: status}, synchronize_session=False)
4.刪
@db_fixture() def delete_app_not_test_by_app_id(session, app_id): session.query(AppNotTest).filter(AppNotTest.appId == app_id).delete(synchronize_session='fetch')
五、其它
1、在使用create_engine建立引擎時,如果預設不指定連線池設定的話,一般情況下,SQLAlchemy會使用一個QueuePool繫結在新建立的引擎上。並附上合適的連線池引數。
2、在這種情況下,當你使用了session後就算顯式地呼叫session.close(),也不能把連線關閉。連線會由QueuePool連線池進行管理並複用。
3、如果想禁用SQLAlchemy提供的資料庫連線池,只需要在呼叫create_engine是指定連線池為NullPool,SQLAlchemy就會在執行session.close()後立刻斷開資料庫連線。當然,如果session物件被析構但是沒有被呼叫session.close(),則資料庫連線不會被斷開,直到程式終止。
from sqlalchemy.pool import NullPool #連線資料庫 def getEngine(): engine = create_engine('mysql://root:[email protected]/baa?charset=utf8mb4', poolclass=NullPool, echo=False) #print(engine) return engine def getSession(): engine = getEngine() BaseMode.metadata.create_all(engine)## 資料庫生成表 # Session = sessionmaker(bind=engine) # session = Session() DBSession = sessionmaker(bind=engine) session = DBSession() return session