1. 程式人生 > >Python SQLAlchemy

Python SQLAlchemy

SQLAlchemy介紹

SQLAlchemy是一個基於Python的ORM框架。該框架是建立在DB-API之上,使用關係物件對映進行資料庫操作。

簡而言之就是,將類和物件轉換成SQL,然後使用資料API執行SQL並獲取執行結果。

補充:什麼是DB-API ? 是Python的資料庫介面規範。

在沒有DB-API之前,各資料庫之間的應用介面非常混亂,實現各不相同,

專案需要更換資料庫的時候,需要做大量的修改,非常不方便,DB-API就是為了解決這樣的問題。

pip install sqlalchemy

組成部分:

  -- engine,框架的引擎

  -- connection pooling  資料庫連線池

  -- Dialect  選擇連結資料庫的DB-API種類(實際選擇哪個模組連結資料庫)

  -- Schema/Types  架構和型別

  -- SQL Expression Language   SQL表示式語言

連線資料庫

SQLAlchemy 本身無法操作資料庫,其必須依賴遵循DB-API規範的三方模組,

Dialect 用於和資料API進行互動,根據配置的不同調用不同資料庫API,從而實現資料庫的操作。

# MySQL-PYthon
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

#
pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
# 更多 # http://docs.sqlalchemy.org/en/latest/dialects/index.html
不同的資料庫API
from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線數
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 連線池中沒有執行緒最多等待時間,否則報錯
    pool_recycle=-1,  # 多久之後對連線池中的連線進行回收(重置)-1不回收
)
連結資料庫

執行原生SQL

from sqlalchemy import create_engine
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",
    max_overflow=0,
    pool_size=5,
)

def test():
    cur = engine.execute("select * from Course")
    result = cur.fetchall()
    print(result)
    cur.close()

if __name__ == '__main__':
    test()
# [(1, '生物', 1), (2, '體育', 2), (3, '物理', 1)]
engine.execute

ORM

一,建立表

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint
import datetime

ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",)

Base = declarative_base()


class UserInfo(Base):
    __tablename__ = "user_info"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)

    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )


def create_db():
    Base.metadata.create_all(ENGINE)


def drop_db():
    Base.metadata.drop_all(ENGINE)



if __name__ == '__main__':
    create_db()
單表的建立
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
import datetime


ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",)

Base = declarative_base()


# ======一對多示例=======
class UserInfo(Base):
    __tablename__ = "user_info"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    # FK欄位的建立
    hobby_id = Column(Integer, ForeignKey("hobby.id"))
    # 不生成表結構 方便查詢使用
    hobby = relationship("Hobby", backref="user")

    __table_args__ = (
        UniqueConstraint("id", "name", name="uni_id_name"),
        Index("name", "email")
    )


class Hobby(Base):
    __tablename__ = "hobby"

    id = Column(Integer, primary_key=True)
    title = Column(String(32), default="碼程式碼")




def create_db():
    Base.metadata.create_all(ENGINE)


def drop_db():
    Base.metadata.drop_all(ENGINE)



if __name__ == '__main__':
    create_db()
    # drop_db()
一對多的建立
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import Index, UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
import datetime


ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",)

Base = declarative_base()


# ======多對多示例=======
class Book(Base):
    __tablename__ = "book"

    id = Column(Integer, primary_key=True)
    title = Column(String(32))
    # 不生成表字段 僅用於查詢方便
    tags = relationship("Tag", secondary="book2tag", backref="books")


class Tag(Base):
    __tablename__ = "tag"

    id = Column(Integer, primary_key=True)
    title = Column(String(32))


class Book2Tag(Base):
    __tablename__ = "book2tag"

    id = Column(Integer, primary_key=True)
    book_id = Column(Integer, ForeignKey("book.id"))
    tag_id = Column(Integer, ForeignKey("tag.id"))


def create_db():
    Base.metadata.create_all(ENGINE)

def drop_db():
    Base.metadata.drop_all(ENGINE)

if __name__ == '__main__':
    create_db()
    # drop_db()
多對多的建立

 

 二,對資料庫表的操作(增刪改查)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models_demo import Tag


ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",)

Session = sessionmaker(bind=ENGINE)

# 每次執行資料庫操作的時候,都需要建立一個session

# 執行緒安全,基於本地執行緒實現每個執行緒用同一個session


session = scoped_session(Session)

# =======執行ORM操作==========
tag_obj = Tag(title="SQLAlchemy")
# 新增
session.add(tag_obj)
# 提交
session.commit()
# 關閉session
session.close()
scoped_session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models_demo import Tag, UserInfo
import threading


ENGINE = create_engine("mysql+pymysql://root:[email protected]:3306/code_record?charset=utf8",)

Session = sessionmaker(bind=ENGINE)

# 每次執行資料庫操作的時候,都需要建立一個session
session = Session()
session = scoped_session(Session)

# ============新增============
# tag_obj = Tag(title="SQLAlchemy")
# # 新增
# session.add(tag_obj)
# session.add_all([
#     Tag(title="Python"),
#     Tag(title="Django"),
# ])
# # 提交
# session.commit()
# # 關閉session
# session.close()

# ============基礎查詢============
# ret1 = session.query(Tag).all()
# ret2 = session.query(Tag).filter(Tag.title == "Python").all()
# ret3 = session.query(Tag).filter_by(title="Python").all()
# ret4 = session.query(Tag).filter_by(title="Python").first()
# print(ret1, ret2, ret3, ret4)

# ============刪除===========
# session.query(Tag).filter_by(id=1).delete()
# session.commit()

# ===========修改===========
session.query(Tag).filter_by(id=22).update({Tag.title: "LOL"})
session.query(Tag).filter_by(id=23).update({"title": "王者農藥"})
session.query(Tag).filter_by(id=24).update({"title": Tag.title + "~"}, synchronize_session=False)
# synchronize_session="evaluate" 預設值進行數字加減
session.commit()
基本的增刪改查
# 條件查詢
ret1 = session.query(Tag).filter_by(id=22).first()
ret2 = session.query(Tag).filter(Tag.id > 1, Tag.title == "LOL").all()
ret3 = session.query(Tag).filter(Tag.id.between(22, 24)).all()
ret4 = session.query(Tag).filter(~Tag.id.in_([22, 24])).first()
from sqlalchemy import and_, or_
ret5 = session.query(Tag).filter(and_(Tag.id > 1, Tag.title == "LOL")).first()
ret6 = session.query(Tag).filter(or_(Tag.id > 1, Tag.title == "LOL")).first()
ret7 = session.query(Tag).filter(or_(
    Tag.id>1,
    and_(Tag.id>3, Tag.title=="LOL")
)).all()
# 萬用字元
ret8 = session.query(Tag).filter(Tag.title.like("L%")).all()
ret9 = session.query(Tag).filter(~Tag.title.like("L%")).all()
# 限制
ret10 = session.query(Tag).filter(~Tag.title.like("L%")).all()[1:2]
# 排序
ret11 = session.query(Tag).order_by(Tag.id.desc()).all()  # 倒序
ret12 = session.query(Tag).order_by(Tag.id.asc()).all()  # 正序
# 分組
ret13 = session.query(Tag.test).group_by(Tag.test).all()
# 聚合函式
from sqlalchemy.sql import func
ret14 = session.query(
    func.max(Tag.id),
    func.sum(Tag.test),
    func.min(Tag.id)
).group_by(Tag.title).having(func.max(Tag.id > 22)).all()
# 連表
ret15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()
# print(ret15) 得到一個列表套元組 元組裡是兩個物件
ret16 = session.query(UserInfo).join(Hobby).all()
# print(ret16) 得到列表裡面是前一個物件
# 相當於inner join
# for i in ret16:
#     # print(i[0].name, i[1].title)
#     print(i.hobby.title)
ret17 = session.query(Hobby).join(UserInfo, isouter=True).all()
ret17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()
ret18 = session.query(Hobby).outerjoin(UserInfo).all()
ret18_1 = session.query(UserInfo).outerjoin(Hobby).all()
# 相當於left join
print(ret17)
print(ret17_1)
print(ret18)
print(ret18_1)
常用操作
# 基於relationship的FK
# 新增
user_obj = UserInfo(name="提莫", hobby=Hobby(title="種蘑菇"))
session.add(user_obj)

hobby = Hobby(title="彈奏一曲")
hobby.user = [UserInfo(name="琴女"), UserInfo(name="妹紙")]
session.add(hobby)
session.commit()

# 基於relationship的正向查詢
user_obj_1 = session.query(UserInfo).first()
print(user_obj_1.name)
print(user_obj_1.hobby.title)

# 基於relationship的反向查詢
hb = session.query(Hobby).first()
print(hb.title)
for i in hb.user:
    print(i.name)

session.close()
基於relationship的FK
# 新增
book_obj = Book(title="Python原始碼剖析")
tag_obj = Tag(title="Python")
b2t = Book2Tag(book_id=book_obj.id, tag_id=tag_obj.id)
session.add_all([
    book_obj,
    tag_obj,
    b2t,
])
session.commit()

#  上面有坑哦~~~~
book = Book(title="測試")
book.tags = [Tag(title="測試標籤1"), Tag(title="測試標籤2")]
session.add(book)
session.commit()

tag = Tag(title="LOL")
tag.books = [Book(title="大龍重新整理時間"), Book(title="小龍重新整理時間")]
session.add(tag)
session.commit()

# 基於relationship的正向查詢
book_obj = session.query(Book).filter_by(id=4).first()
print(book_obj.title)
print(book_obj.tags)
# 基於relationship的反向查詢
tag_obj = session.query(Tag).first()
print(tag_obj.title)
print(tag_obj.books)
基於relationship的M2M