Python SQLAlchemy
阿新 • • 發佈:2019-01-02
SQLAlchemy介紹
SQLAlchemy是一個基於Python的ORM框架。該框架是建立在DB-API之上,使用關係物件對映進行資料庫操作。
簡而言之就是,將類和物件轉換成SQL,然後使用資料API執行SQL並獲取執行結果。
補充:什麼是DB-API ? 是Python的資料庫介面規範。
在沒有DB-API之前,各資料庫之間的應用介面非常混亂,實現各不相同,
專案需要更換資料庫的時候,需要做大量的修改,非常不方便,DB-API就是為了解決這樣的問題。
pip install sqlalchemy
組成部分:
-- engine,框架的引擎
-- connection pooling 資料庫連線池
-- Dialect 選擇連結資料庫的DB-API種類(實際選擇哪個模組連結資料庫)
-- Schema/Types 架構和型別
-- SQL Expression Language SQL表示式語言
連線資料庫
SQLAlchemy 本身無法操作資料庫,其必須依賴遵循DB-API規範的三方模組,
Dialect 用於和資料API進行互動,根據配置的不同調用不同資料庫API,從而實現資料庫的操作。
# MySQL-PYthon mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> #不同的資料庫APIpymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]# 更多 # http://docs.sqlalchemy.org/en/latest/dialects/index.html
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8", max_overflow=0, # 超過連線池大小外最多建立的連線數 pool_size=5, # 連線池大小 pool_timeout=30, # 連線池中沒有執行緒最多等待時間,否則報錯 pool_recycle=-1, # 多久之後對連線池中的連線進行回收(重置)-1不回收 )連結資料庫
執行原生SQL
from sqlalchemy import create_engine engine = create_engine( "mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8", max_overflow=0, pool_size=5, ) def test(): cur = engine.execute("select * from Course") result = cur.fetchall() print(result) cur.close() if __name__ == '__main__': test() # [(1, '生物', 1), (2, '體育', 2), (3, '物理', 1)]engine.execute
ORM
一,建立表
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint import datetime ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",) Base = declarative_base() class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db()單表的建立
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",) Base = declarative_base() # ======一對多示例======= class UserInfo(Base): __tablename__ = "user_info" id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) email = Column(String(32), unique=True) create_time = Column(DateTime, default=datetime.datetime.now) # FK欄位的建立 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 不生成表結構 方便查詢使用 hobby = relationship("Hobby", backref="user") __table_args__ = ( UniqueConstraint("id", "name", name="uni_id_name"), Index("name", "email") ) class Hobby(Base): __tablename__ = "hobby" id = Column(Integer, primary_key=True) title = Column(String(32), default="碼程式碼") def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()一對多的建立
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy import Index, UniqueConstraint, ForeignKey from sqlalchemy.orm import relationship import datetime ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",) Base = declarative_base() # ======多對多示例======= class Book(Base): __tablename__ = "book" id = Column(Integer, primary_key=True) title = Column(String(32)) # 不生成表字段 僅用於查詢方便 tags = relationship("Tag", secondary="book2tag", backref="books") class Tag(Base): __tablename__ = "tag" id = Column(Integer, primary_key=True) title = Column(String(32)) class Book2Tag(Base): __tablename__ = "book2tag" id = Column(Integer, primary_key=True) book_id = Column(Integer, ForeignKey("book.id")) tag_id = Column(Integer, ForeignKey("tag.id")) def create_db(): Base.metadata.create_all(ENGINE) def drop_db(): Base.metadata.drop_all(ENGINE) if __name__ == '__main__': create_db() # drop_db()多對多的建立
二,對資料庫表的操作(增刪改查)
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次執行資料庫操作的時候,都需要建立一個session # 執行緒安全,基於本地執行緒實現每個執行緒用同一個session session = scoped_session(Session) # =======執行ORM操作========== tag_obj = Tag(title="SQLAlchemy") # 新增 session.add(tag_obj) # 提交 session.commit() # 關閉session session.close()scoped_session
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from models_demo import Tag, UserInfo import threading ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",) Session = sessionmaker(bind=ENGINE) # 每次執行資料庫操作的時候,都需要建立一個session session = Session() session = scoped_session(Session) # ============新增============ # tag_obj = Tag(title="SQLAlchemy") # # 新增 # session.add(tag_obj) # session.add_all([ # Tag(title="Python"), # Tag(title="Django"), # ]) # # 提交 # session.commit() # # 關閉session # session.close() # ============基礎查詢============ # ret1 = session.query(Tag).all() # ret2 = session.query(Tag).filter(Tag.title == "Python").all() # ret3 = session.query(Tag).filter_by(title="Python").all() # ret4 = session.query(Tag).filter_by(title="Python").first() # print(ret1, ret2, ret3, ret4) # ============刪除=========== # session.query(Tag).filter_by(id=1).delete() # session.commit() # ===========修改=========== session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"}) session.query(Tag).filter_by(id=23).update({"title": "王者農藥"}) session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False) # synchronize_session="evaluate" 預設值進行數字加減 session.commit()基本的增刪改查
# 條件查詢 ret1 = session.query(Tag).filter_by(id=22).first() ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all() ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all() ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first() from sqlalchemy import and_, or_ ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first() ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first() ret7 = session.query(Tag).filter(or_( Tag.id>1, and_(Tag.id>3, Tag.title=="LOL") )).all() # 萬用字元 ret8 = session.query(Tag).filter(Tag.title.like("L%")).all() ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all() # 限制 ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2] # 排序 ret11 = session.query(Tag).order_by(Tag.id.desc()).all() # 倒序 ret12 = session.query(Tag).order_by(Tag.id.asc()).all() # 正序 # 分組 ret13 = session.query(Tag.test).group_by(Tag.test).all() # 聚合函式 from sqlalchemy.sql import func ret14 = session.query( func.max(Tag.id), func.sum(Tag.test), func.min(Tag.id) ).group_by(Tag.title).having(func.max(Tag.id > 22)).all() # 連表 ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all() # print(ret15) 得到一個列表套元組 元組裡是兩個物件 ret16 = session.query(UserInfo).join(Hobby).all() # print(ret16) 得到列表裡面是前一個物件 # 相當於inner join # for i in ret16: # # print(i[0].name, i[1].title) # print(i.hobby.title) ret17 = session.query(Hobby).join(UserInfo, isouter=True).all() ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all() ret18 = session.query(Hobby).outerjoin(UserInfo).all() ret18_1 = session.query(UserInfo).outerjoin(Hobby).all() # 相當於left join print(ret17) print(ret17_1) print(ret18) print(ret18_1)常用操作
# 基於relationship的FK # 新增 user_obj = UserInfo(name="提莫", hobby=Hobby(title="種蘑菇")) session.add(user_obj) hobby = Hobby(title="彈奏一曲") hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹紙")] session.add(hobby) session.commit() # 基於relationship的正向查詢 user_obj_1 = session.query(UserInfo).first() print(user_obj_1.name) print(user_obj_1.hobby.title) # 基於relationship的反向查詢 hb = session.query(Hobby).first() print(hb.title) for i in hb.user: print(i.name) session.close()基於relationship的FK
# 新增 book_obj = Book(title="Python原始碼剖析") tag_obj = Tag(title="Python") b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id) session.add_all([ book_obj, tag_obj, b2t, ]) session.commit() # 上面有坑哦~~~~ book = Book(title="測試") book.tags = [Tag(title="測試標籤1"), Tag(title="測試標籤2")] session.add(book) session.commit() tag = Tag(title="LOL") tag.books = [Book(title="大龍重新整理時間"), Book(title="小龍重新整理時間")] session.add(tag) session.commit() # 基於relationship的正向查詢 book_obj = session.query(Book).filter_by(id=4).first() print(book_obj.title) print(book_obj.tags) # 基於relationship的反向查詢 tag_obj = session.query(Tag).first() print(tag_obj.title) print(tag_obj.books)基於relationship的M2M