1. 程式人生 > 其它 >sqlalchemy模組介紹、單表操作、一對多表操作、多對多表操作、flask整合.

sqlalchemy模組介紹、單表操作、一對多表操作、多對多表操作、flask整合.

今日內容概要

  • sqlalchemy介紹和快速使用
  • 單表操作增刪查改
  • 一對多
  • 多對多
  • flask整合

內容詳細

1、sqlalchemy介紹和快速使用

# SQLAlchemy是一個基於 Python實現的ORM框架

# django的orm框架---》只能在django中用,不能單獨用

# SQLAlchemy單獨的,可以整合到任意框架中

# peewee:輕量級

# python的非同步orm框架不多,  sanic, fastapi---》一旦用了非同步,後續所有都需要用非同步---》操作mysql,aiomysql--》操作redis,使用aioredis

# 公司選擇
	-第一:peewee-async
	-第二:框架是非同步---》沒有使用非同步orm框架---》SQLAlchemy---》生成和遷移表---》查詢操作資料用原生操作
  
  
# 寫django專案---》庫和表已經有了
	-正常操作django中建表模型---》遷移---》表
	-反向生成models--》表---》models.py----》改表---》再反向生成
	python manage.py inspectdb > app/models.py

1.1 執行原生sql

# 執行原生sql快速使用
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第一步:建立engine
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 池中沒有執行緒最多等待的時間,否則報錯
    pool_recycle=-1  # 多久之後對執行緒池中的執行緒進行一次連線的回收(重置)
)


# 第二步:使用
def task():
    conn = engine.raw_connection()  # 從連線池中取一個連線
    cursor = conn.cursor()
    sql = "select * from cmd"
    cursor.execute(sql)
    print(cursor.fetchall())


if __name__ == '__main__':
    for i in range(20):
        t = threading.Thread(target=task)
        t.start()

# 查詢mysql的客戶端連線數

2、單表操作增刪查改

2.1 表遷移

# 不能建立資料庫(django orm也不能)

# 只能做表的建立和刪除,不能做表更改(django orm能)---》藉助於第三方實現

###### 第一步:生成基類,所有表模型都要繼承這個基類
	django 的orm繼承一個父類,Base就是那個父類

###### 第二步:寫表模型,繼承父類,寫欄位   (注意區別於django 的orm)
	django的default--》可不可以傳個函式記憶體地址---》插入的時候通過函式運算完得到的值

###### 第三步:遷移,通過表模型,生成表

建立models.py

import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 第一步:生成基類,所有表模型都要繼承這個基類
# django 的orm繼承一個父類,Base就是那個父類
Base = declarative_base()


# 第二步:寫表模型,繼承父類,寫欄位   (注意區別於django 的orm)
# django的default--》可不可以傳個函式記憶體地址---》插入的時候通過函式運算完得到的值
class Users(Base):
    id = Column(Integer, primary_key=True, autoincrement=True)  # id 主鍵
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可為空
    email = Column(String(32), unique=True)  # 唯一
    # datetime.datetime.now不能加括號,加了括號,以後永遠是當前時間
    ctime = Column(DateTime, default=datetime.datetime.now)  # 預設值
    extra = Column(Text, nullable=True)  # 大文字,可以為空

    __tablename__ = 'lqz_users'  # 資料庫表名稱,如果不寫,就報錯
    # __table_args__ = (
    #     UniqueConstraint('id', 'name', name='uix_id_name'),  # 聯合唯一
    #     Index('ix_id_name', 'name', 'email'),  # 聯合索引
    # )

# 聚簇索(mysql主鍵自動建索引,聚簇索引,mysql基於聚簇索引構建的B+樹),一定會有,沒有顯示建主鍵,mysql會隱藏一個
# 輔助索引:手動建的叫輔助索引---》單獨減了索引---》如果你的輔助索引過多,非常影響插入效率,適度建索引

建立演示檔案:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from threading import Thread
from models import Base


# 第三步:遷移,通過表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 池中沒有執行緒最多等待的時間,否則報錯
    pool_recycle=-1  # 多久之後對執行緒池中的執行緒進行一次連線的回收(重置)
)


def create_table():
    # 通過engine這個連線配置,創建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通過engine這個連線配置,刪除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()  # 建立表
    delete_table()  # 刪除表

2.2 簡單的表操作

### 操作表,增加一條記錄,以後都用conn/session(命名可以更改)操作

# 第一步:建立engin

# 第二步:通過session得到連線物件
	Session = sessionmaker(bind=engine)
	session = Session()

# # 第三步:例項化得到模型類的物件,增加到資料庫中
	usr=Users(name='lqz001')
	session.add(usr)

# # 第四步:提交事務
	session.commit()

2.3 基於scoped_session實現執行緒安全

# # 以後操作資料,都用session物件---》定義在flask的函式外部還是內部?
# # 放內部沒問題,每次都生成一個新的session,耗費資源
# # 如果定義在函式外部,會存在 多執行緒併發使用同一個變數session,要把session做成併發安全的
Session = sessionmaker(bind=engine)
session = scoped_session(Session)  # 也是基於local,給每一個執行緒自己創造一個session

# # 只需要記住,如果是多執行緒使用,或者在web框架中,使用scoped_session生成session就可以了
# # 整合到flask中,有flask-sqlalchemy第三方,內部已經處理了scoped_session
# # 全域性用這個一個session,不用擔心併發不安全
usr = Users(name='lqz002')
session.add(usr)  # 執行緒一用:取local中取執行緒1的那個session,如果就給,沒有就重新創造一個

# # 第四步:提交事務
session.commit()

測試執行緒安全

# 執行緒一用:
	取local中取執行緒1的那個session,如果就給,沒有就重新創造一個
# 執行緒二用:
	取local中取執行緒2的那個session,如果就給,沒有就重新創造一個


# # 測試:開3個執行緒,如果定義全域性的session,在3個執行緒中用,session物件應該是同一個
Session = sessionmaker(bind=engine)
session = Session()
# session = scoped_session(Session)


def task():
    # usr = Users(name='lqz003')
    # session.add(usr)
    # session.commit()
    # print(session.registry.registry.value) # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60>
    print(session)  # <sqlalchemy.orm.scoping.scoped_session object at 0x7f8fbceeea60>


# 開3個執行緒,如果定義scoped_session,在3個執行緒中用,session物件應該是不是同一個,獨有的
if __name__ == '__main__':
    for i in range(3):
        t = Thread(target=task)
        t.start()

2.4 基本增刪查改

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from sqlalchemy.orm import scoped_session
from models import Base

# 第三步:遷移,通過表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 池中沒有執行緒最多等待的時間,否則報錯
    pool_recycle=-1  # 多久之後對執行緒池中的執行緒進行一次連線的回收(重置)
)


def create_table():
    # 通過engine這個連線配置,創建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通過engine這個連線配置,刪除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()
    # delete_table()
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)
    
    
    ### 1 增加操作
    # 增加一個
    obj1 = Users(name="lqz003")
    session.add(obj1)
    
    # 增加多個,不同物件
    session.add_all([
        Users(name="lqz009"),
        Users(name="lqz008"),
    ])
    session.commit()
    
    
    # 2 刪除操作---》查出來再刪---》
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    
    
    # 3 修改操作--》查出來改
    # 傳字典
    session.query(Users).filter(Users.id > 0).update({"name": "lqz"})
    # # 類似於django的F查詢
    # # 字串加
    # session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    # # 數字加
    # session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    session.commit()
    
    
    # 4 查詢操作----》
    r1 = session.query(Users).all()  # 查詢所有
    # 只取age列,把name重新命名為xx
    # 原生sql:select name as xx,age from user;
    # r2 = session.query(Users.name.label('xx'), Users.age).all()

    # # filter傳的是表示式,filter_by傳的是引數
    # r3 = session.query(Users).filter(Users.name == "lqz").all()
    # # r3 = session.query(Users).filter(Users.id >= 1).all()
    # r4 = session.query(Users).filter_by(name='lqz').all()
    # r5 = session.query(Users).filter_by(name='lqz').first()

    # :value 和:name 相當於佔位符,用params傳引數
    # r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='lqz').order_by(
    #     Users.id).all()
    # 自定義查詢sql
    # r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()

2.5 更多查詢操作

    # 更多查詢
    #  條件
    # select * form user where name =lqz
    # ret = session.query(Users).filter_by(name='lqz').all()

    # 表示式,and條件連線
    # select * from user where id >1 and name = lqz
    # ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
    # select * from user where id between 1,3  and name = lqz
    # ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'lqz').all()

    # 注意下劃線
    # select * from user where id in (1,3,4)
    # ret = session.query(Users).filter(Users.id.in_([1, 3, 4])).all()

    # # ~非,除。。外
    # select * from user where id not in (1,3,4)
    # ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()

    # # # 二次篩選
    # # ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz'))).all()
    # from sqlalchemy import and_, or_
    #
    # # # or_包裹的都是or條件,and_包裹的都是and條件
    # ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    # ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    # ret = session.query(Users).filter(
    #     or_(
    #         Users.id < 2,
    #         and_(Users.name == 'eric', Users.id > 3),
    #         Users.extra != ""
    #     )).all()

    # # 萬用字元,以e開頭,不以e開頭
    # ret = session.query(Users).filter(Users.name.like('e%')).all()
    # ret = session.query(Users).filter(~Users.name.like('e%')).all()

    # # 限制,用於分頁,區間
    # ret = session.query(Users)[1:2]

    # # 排序,根據name降序排列(從大到小)
    # ret = session.query(Users).order_by(Users.id.desc()).all()

    # # 第一個條件重複後,再按第二個條件升序排
    # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

    # # 分組
    # from sqlalchemy.sql import func
    # select * from user group by user.extra;
    # ret = session.query(Users).group_by(Users.extra).all()

    # # 分組之後取最大id,id之和,最小id
    # select max(id),sum(id),min(id) from user group by name ;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).all()

    # haviing篩選
    # select max(id),sum(id),min(id) from user group by name  having min(id)>2;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()

    # select max(id),sum(id),min(id) from user where id >=1 group by name  having min(id)>2;
    # ret = session.query(
    #     func.max(Users.id),
    #     func.sum(Users.id),
    #     func.min(Users.id)).filter(Users.id>=1).group_by(Users.name).having(func.min(Users.id) > 2).all()

    # 連表(預設用forinkey關聯)
    # select * from user,favor where user.id=favor.id
    # ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

    # join表,預設是inner join
    # select * from Person inner join favor on person.favor=favor.id;
    # ret = session.query(Person).join(Favor).all()
    
    # isouter=True 外連,表示Person left join Favor,沒有右連線,反過來即可
    # ret = session.query(Person).join(Favor, isouter=True).all()
    # ret = session.query(Favor).join(Person, isouter=True).all()

    # 列印原生sql
    # aa = session.query(Person).join(Favor, isouter=True)
    # print(aa)

    # 自己指定on條件(連表條件),第二個引數,支援on多個條件,用and_,同上
    # select * from person left join favor on person.id=favor.id;
    # ret = session.query(Person).join(Favor, Person.id == Favor.id, isouter=True).all()

    # 組合(瞭解)UNION 操作符用於合併兩個或多個 SELECT 語句的結果集
    # union和union all的區別?
    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union(q2).all()

    # q1 = session.query(Users.name).filter(Users.id > 2)
    # q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    # ret = q1.union_all(q2).all()

2.6 執行原生sql

    # 執行原生sql
    # 查詢
    cursor = session.execute('select * from users')
    result = cursor.fetchall()

    # 新增
    cursor = session.execute('insert into users(name) values(:value)', params={"value": 'lqz'})
    session.commit()
    print(cursor.lastrowid)

3、一對多表操作

3.1 表模型建立

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='籃球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是類名
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 外來鍵
    # 跟資料庫無關,不會新增欄位,只用於快速連結串列操作
    # 類名,backref用於反向查詢   # 正向查詢按欄位,反向查詢按 pers
    hobby = relationship('Hobby', backref='pers')

3.2 操作表

# 一對多
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker
from models import Users
from sqlalchemy.orm import scoped_session
from models import Base


# 第三步:遷移,通過表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 池中沒有執行緒最多等待的時間,否則報錯
    pool_recycle=-1  # 多久之後對執行緒池中的執行緒進行一次連線的回收(重置)
)


def create_table():
    # 通過engine這個連線配置,創建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通過engine這個連線配置,刪除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # create_table()
    # delete_table()

    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    from models import Hobby, Person

    # 1 增加資料
    # 方式一
    session.add_all([
        Hobby(caption='乒乓球'),
        Hobby(caption='羽毛球'),
        Person(name='張三', hobby_id=1),
        Person(name='李四', hobby_id=1),
    ])
    session.commit()
    
    # 方式二
    person = Person(name='張九', hobby=Hobby(caption='姑娘'))
    session.add(person)
    
    # 方式三
    hb = Hobby(caption='保齡球')
    # 反向欄位
    hb.pers = [Person(name='lqz01'), Person(name='lqz02')]
    session.add(hb)
    session.commit()
    
    
    # 2 查詢
    # 正向查詢
    person = session.query(Person).first()
    print(person.name)
    # 基於物件的跨表查詢
    print(person.hobby.caption)
    # 反向查詢
    v = session.query(Hobby).first()
    print(v.caption)
    print(v.pers)  # 多條

    # 連結串列查詢
    # select person.name ,hobby.caption from person left join bobby on person.hobby_id=hobby.id;
    person_list = session.query(Person.name, Hobby.caption).join(Hobby, isouter=True).all()
    # person_list = session.query(Person,Hobby).join(Hobby, isouter=True).all()
    for row in person_list:
        # print(row.name,row.caption)
        print(row[0].name, row[1].caption)

    person_list = session.query(Person).all()
    for row in person_list:
        print(row.name, row.hobby.caption)

    obj = session.query(Hobby).filter(Hobby.id == 1).first()
    persons = obj.pers
    print(persons)
    session.close()

4、多對多表操作

4.1 表模型建立

# boy girl 相親,一個boy可以約多個女生,一個女生可以相多個男生
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)

    # 與生成表結構無關,僅用於查詢方便,放在哪個單表中都可以
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

4.2 操作表

# 多對多
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models import Base

# 第三步:遷移,通過表模型,生成表
engine = create_engine(
    "mysql+pymysql://root:[email protected]:3306/db01?charset=utf8",
    max_overflow=0,  # 超過連線池大小外最多建立的連線
    pool_size=5,  # 連線池大小
    pool_timeout=30,  # 池中沒有執行緒最多等待的時間,否則報錯
    pool_recycle=-1  # 多久之後對執行緒池中的執行緒進行一次連線的回收(重置)
)


def create_table():
    # 通過engine這個連線配置,創建出所有使用Base管理的表
    Base.metadata.create_all(engine)


def delete_table():
    # 通過engine這個連線配置,刪除出所有使用Base管理的表
    Base.metadata.drop_all(engine)


from models import Boy, Girl, Boy2Girl

if __name__ == '__main__':
    # create_table()
    # delete_table()
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    # 1 增加資料
    #  方式一
    session.add_all([
        Boy(name='彭于晏'),
        Boy(name='劉德華'),
        Girl(name='劉亦菲'),
        Girl(name='迪麗熱巴'),
    ])
    session.commit()
    s2g = Boy2Girl(boy_id=1, girl_id=1)
    session.add(s2g)
    session.commit()

    # 方式二
    boy = Boy(name='lqz')
    boy.girls = [Girl(name='小紅'), Girl(name='校花')]
    session.add(boy)
    session.commit()

    # 方式三
    girl = Girl(name='小梅')
    girl.boys = [Boy(name='lqz001'), Boy(name='lqz002')]
    session.add(girl)
    session.commit()

    # 基於物件的跨表查
    # 使用relationship正向查詢
    v = session.query(Boy).first()
    print(v.name)
    print(v.girls)

    # 使用relationship反向查詢
    v = session.query(Girl).first()
    print(v.name)
    print(v.boys)

5、flask整合

# Flask_SQLAlchemy 操作資料庫

# flask_migrate  模擬django的表遷移
	pip3 install flask_migrate


# flask_migrate使用步驟
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()  # 全域性SQLAlchemy
app = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig')

# 將db註冊到app中,載入配置檔案,flask-session,用一個類包裹一下app
db.init_app(app)

# flask_script建立命令 runserver命令 ,自定義名字
# 下面三句會創建出兩個命令:runserver  db 命令(flask_migrate)
manager=Manager(app)
Migrate(app, db)
manager.add_command('db',MigrateCommand )  # 新增一個db命令,原來有了runserver命令了



# 直接使用命令遷移表即可
# 1 初始化
python3 manage.py db init  # 剛開始幹,生成一個migrate資料夾

# 2 建立表,修改表
python3 manage.py db migrate   # 等同於 makemigartions
python3 manage.py db upgrade   # 等同於 migrate
# Flask_SQLAlchemy給你包裝了基類,和session,以後拿到db

db = SQLAlchemy()  # 全域性 SQLAlchemy

# 增刪查改資料-->併發安全
db.session.query()

# 表模型要繼承基表
class Users(db.Model):