sqlalchemy 一對多/多對一/多對多/一對一關係定義設定全方位操作方法
sqlalchemy 作為一款ORM在操作資料庫方面非常的方便,這裡總結了一些對應關係的設定以及查詢方法!
使用外來鍵關聯表:
表設計
from sqlalchemy import Column, ForeignKey from sqlalchemy.types import String, Integer, CHAR, BIGINT class Blog(BaseModel): __tablename__ = 'blog' id = Column(BIGINT, primary_key=True, autoincrement=True) title = Column(String(64), server_default='', nullable=False) text = Column(String, server_default='', nullable=False) user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False) create = Column(BIGINT, index=True, server_default='0', nullable=False) class User(BaseModel): __tablename__ = 'user' id = Column(BIGINT, primary_key=True, autoincrement=True) name = Column(String(32), server_default='', nullable=False) username = Column(String(32), index=True, server_default='', nullable=True) password = Column(String(64), server_default='', nullable=False)
提交資料
session = Session()
user = User(name='first', username=u'新的')
session.add(user)
session.flush()#這個為了放回 id
blog = Blog(title=u'第一個', user=user.id)
session.add(blog)
session.commit()
一: 一對多關係
表設計
class Parent(Base): # 一 __tablename__ = 'parent' id = Column(Integer, primary_key=True) name = Column(String(64),nullable=False) full_name = Column(String(64)) children = relationship("Child") # 預設返回的是列表,collection_class=set 加他返回的是集合, #collection_class=attribute_mapped_collection('name') 返回的是name 字典 值從屬性中取 #collection_class=mapped_collection(lambda children: children.name.lower() 這個可以自定義值 # 在父表類中通過 relationship() 方法來引用子表的類集合 class Child(Base): #多 __tablename__ = 'child' id = Column(Integer, primary_key=True) name = Column(String(64),nullable=False) full_name = Column(String(64)) parent_id = Column(Integer, ForeignKey('parent.id')) # 在子表類中通過 foreign key (外來鍵)引用父表的參考欄位
新增資料
parent = Parent(name='morgan', full_name='morganlions') parent.children = Child(name='child 1 name',full_name='child 1 full name') session.add(parent) session.commit() #如果parent 已經存在 parent = session.query(Parent).filter(Parent.name=='Morgan') children = [ Child(name='child 2', full_name='child 2 full name', parent_obj = parent), Child(name='child 3', full_name='child 3 full name', parent_obj = parent), ] session.add_all(children) session.commit()
查詢資料(單向關係)
parent = session.query(Parent).get(1) print(parent.children) #一對多查詢 parent = session.query(Parent).filter(Parent.children.any(Child.name==u'child 1')).first()
二:多對一關係
這個相比上面的一對多而言是雙向的關係.
表設計
class Parent(Base):#一 __tablename__ = 'parent' id = Column(Integer, primary_key=True) name = Column(String(64), nullable=False) full_name = Column(String(64)) children = relationship("Child", back_populates="parent", lazy="dynamic") class Child(Base):#多 __tablename__ = 'child' id = Column(Integer, primary_key=True) name = Column(String(64), nullable=False) full_name = Column(String(64)) parent_id = Column(Integer, ForeignKey('parent.id')) parent = relationship("Parent", order_by = 'Parent.id' back_populates="children") # 子表類中附加一個 relationship() 方法 # 並且在(父)子表類的 relationship() 方法中使用 relationship.back_populates 引數
新增資料
這個和單向的一樣,不過可以反過來操作
查詢資料(雙向關係)
parent = session.query(Parent).get(1) print(parent.children) children = session.query(Child).get(1) print(children.parent) # lazy="dynamic" 我們新增 這個後可以子查詢關聯表的時候自由的選擇 print(parent.children.all()) print(patent.children.filter(Child.name='child1').first() #多對一查詢父親 children = session.query(Child).filter(Child.parent_obj.has(Parent.name=='morgan')).first()
三:多對多關係
單向多對多
通過中間表 association 關聯表 連結 left 和 right 兩張表 left -> right ,建立的是單向關係# 多對多關係中的兩個表之間的一個關聯表 association_table = Table('association', Base.metadata, Column('left_id', Integer, ForeignKey('left.id')), Column('right_id', Integer, ForeignKey('right.id')) ) class Parent(Base): __tablename__ = 'left' id = Column(Integer, primary_key=True) children = relationship("Child",secondary=association_table) # 在父表中的 relationship() 方法傳入 secondary 引數,其值為關聯表的表名 class Child(Base): __tablename__ = 'right' id = Column(Integer, primary_key=True)
雙向多對多
通過中間表 association 關聯表 連結 left 和 right 兩張表 left <-> right ,建立的是雙向關係association_table = Table('association', Base.metadata, Column('left_id', Integer, ForeignKey('left.id')), Column('right_id', Integer, ForeignKey('right.id')) ) class Parent(Base): __tablename__ = 'left' id = Column(Integer, primary_key=True) children = relationship("Child", secondary=association_table,back_populates="parents") class Child(Base): __tablename__ = 'right' id = Column(Integer, primary_key=True) parents = relationship( "Parent",secondary=association_table,back_populates="children")
下面是一種比較簡潔的建立雙向多對多關係的方法,上面的方法中我們用的是 back_populates ,下面我們將其換成,backref,這樣我們只需要在第一個雙向關係建立的時候,建立反向雙向關係。
association_table = Table('association', Base.metadata,
Column('left_id', Integer, ForeignKey('left.id')),
Column('right_id', Integer, ForeignKey('right.id'))
)
class Parent(Base):
__tablename__ = 'left'
id = Column(Integer, primary_key=True)
name = Column(String(64),nullable=False, index=True)
full_name = Column(String(64))
children = relationship("Child",secondary=association_table, backref="parents")
class Child(Base):
__tablename__ = 'right'
name = Column(String(64),nullable=False, index=True)
full_name = Column(String(64))
id = Column(Integer, primary_key=True)
secondary 可以作為回撥函式返回一個值
class Parent(Base):
__tablename__ = 'left'
id = Column(Integer, primary_key=True)
children = relationship("Child",secondary=lambda: association_table,backref="parents")
多對多關係新增資料:
parent = Parent(name='Morgan',full_name='MorganLions')
parent.children = [Child(name='child name',full_name='child full name')]
#提交資料
session.add(parent)
session.commit()
#如果是雙向關係也可以這樣新增
child = Child(name='child name',full_name='child full name'}
child.parents = [Parent(name='Morgan', full_name ='Morgan Lions')]
session.add(child)
session.commit()
session = Session()
#找出資料在新增
parent = session.query(Parent).filter(Parent.name=='Morgan').one()
parent.children = [Child(name='child name',full_name='child full name')]
session.commit()
#刪除多對多中間關係,不刪除實體,這裡我們對應的是children
#1清除parent 中的 children 資料
parent = session.query(Parent).filter(Parent.name='Morgan').one()
parent.children=[]
session.commit()
#2刪除兩個表對應的關係
children = session.query(Child).filter(Child.name='child 1').one()
parent.children.remove(children)
session.commit()
#刪除實體,不刪除關係
# 1:建立要實驗的資料
parent = session.query(Parent).filter(Parent.name=='Morgan').one()
children = Child(name='child 5)
parent.children = [children]
session.commit()
#2: 刪除實體
children = session.query(Child).filter(Child.name=='child 5').one()
session.delete(children)
session.commit()
多對多關係查詢資料
1:這是一種方法通過物件的方法去轉換,當然你也可以直接返回去用,我需要列表的形式,還有另外的一種方法,自己寫個方法在模型檔案中,就是你建立資料庫檔案的表中新增相應的方法,這裡我貼出自己的方法,歡迎指正,謝謝。
#單條資料查詢
parent = session.query(Parent).get(1)
#找到兒子
print(parent.children)
children = session.query(Child).get(1)
#找到父親
print(children.parent)
#多條資料查詢
parent = session.query(Parent).order_by(Parent.id)
count = parent.count() #計數
page = ...
limit = ...
parent = parent.offset(page).limit(limit)
result = []
for p in parent.all():
arg ={}
arg['id'] = p.id
arg['name'] = p.name
arg['full_name'] = p.full_name
arg['children'] = {c.name:c.full_name for c in p.children}
result.append(arg)
print(result)
2:在模型中新增如下的方法,這個方法,如果去關聯多對多查詢,比較慢,因為本質上而言,還是和資料庫的多表查詢沒有什麼區別,還是要轉換成對應的sql語句去執行,速度自然就慢了,建議在列表中儘量不要用這種多表聯合查詢。效能很不好。
def column_dict(self, params=None, operation=None, relation=None):
'''
sql查詢返回字典
:param params: 需要顯示的欄位
:param operation:操作方法
{'one':func1 'two': func2 , 'param':'提示術語'}
{'one':func1 'two': 'param':'提示術語'}
{'one':func1 'two': func2 }
param 作為第二個函式的引數,也可以作為單獨的提示
:param relation
:return:
'''
model_dict = dict(self.__dict__)
del model_dict['_sa_instance_state']
if params:
keys = [k for k in model_dict.keys()]
for key in keys:
if key not in params:
del model_dict[key]
if relation:
for r in relation:
rel = eval('self.{0}'.format(r))
result = [self.change_str(operation, x.column_dict()) for x in rel]
model_dict['{0}'.format(r)] = result
if operation:
model_dict = self.change_str(operation, model_dict)
if isinstance(model_dict, str):
return False, 'model_dict', 0
return model_dict
Base.column_dict = column_dict
@staticmethod
def change_str(operation, model_dict):
'''
改變輸出型別
:param operation:
:param model_dict:
:return:
'''
for key, funcs in operation.items():
method = model_dict[key]
func = funcs.get('one')
second_func = funcs.get('two')
param = funcs.get('param')
if param and func and second_func:
model_dict[key] = func(method) if method else second_func(param)
elif second_func is None and func and param:
model_dict[key] = func(method) if method else param
elif param is None and func and second_func:
model_dict[key] = func(method) if method else second_func()
else:
return '操作函式設定錯誤'
return model_dict
#呼叫
parent = session.query(Parent)
operation = {
'name: {'one': func, 'param': '引數'},
}
relation = ['children']
result = [x.column_dict(operation=operation, relation=relation) for x in parent]
這個方法也可以不傳引數,直接呼叫,可以直接返回字典,返回的是所有欄位,如果要自己選定欄位需要傳引數params=‘欄位1,欄位二’,這個方法可以關聯很多表,但速度可想而知。在不關聯的情況下查詢速度180ms 左右,關聯一個表600ms 左右,關聯 二個 1000ms左右,以此類推。還是那句話,不建議列表中關聯查詢,更不要用這樣的多對多的關係。
sqlalchemy自動關係
看到這裡你可能會想,太麻煩了,有沒有能自動關聯的關係呢,答案是有的,在relationship 中有個引數cascade
cascade 所有的可選字串項是:
all , 所有操作都會自動處理到關聯物件上.
save-update , 關聯物件自動新增到會話.
delete , 關聯物件自動從會話中刪除.
delete-orphan , 屬性中去掉關聯物件, 則會話中會自動刪除關聯物件.**只適用於一對多的關係**
merge , session.merge() 時會處理關聯物件.
refresh-expire , session.expire() 時會處理關聯物件.
expunge , session.expunge() 時會處理關聯物件.
delete
class Blog(BaseModel):
__tablename__ = 'blog'
id = Column(BIGINT, primary_key=True, autoincrement=True)
title = Column(String(64), server_default='', nullable=False)
user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)
class User(BaseModel):
__tablename__ = 'user'
id = Column(BIGINT, primary_key=True, autoincrement=True)
name = Column(String(32), server_default='', nullable=False)
blog_list = relationship('Blog', cascade='save-update, delete')
if __name__ == '__main__':
session = Session()
#user = User(name=u'使用者')
#user.blog_list = [Blog(title=u'哈哈')]
#session.add(user)
user = session.query(User).first()
session.delete(user)
session.commit()
merge 有則修改,無則建立
class Blog(BaseModel):
__tablename__ = 'blog'
id = Column(BIGINT, primary_key=True, autoincrement=True)
title = Column(String(64), server_default='', nullable=False)
user = Column(BIGINT, ForeignKey('user.id'), index=True, nullable=False)
class User(BaseModel):
__tablename__ = 'user'
id = Column(BIGINT, primary_key=True, autoincrement=True)
name = Column(String(32), server_default='', nullable=False)
blog_list = relationship('Blog', cascade='save-update, delete, delete-orphan, merge')
if __name__ == '__main__':
session = Session()
user = User(id=1, name='1')
session.add(user)
session.commit(user)
user = User(id=1, blog_list=[Blog(title='哈哈')])
session.merge(user)
session.commit()
屬性代理
from sqlalchemy.ext.associationproxy import association_proxy
class Blog(BaseModel):
__tablename__ = 'blog'
id = Column(Integer, autoincrement=True, primary_key=True)
title = Column(Unicode(32), nullable=False, server_default='')
user = Column(Integer, ForeignKey('user.id'), index=True)
#對應的屬性代理設定
def __init__(self, title):
self.title = title
class User(BaseModel):
__tablename__ = 'user'
id = Column(Integer, autoincrement=True, primary_key=True)
name = Column(Unicode(32), nullable=False, server_default='')
blog_list = relationship('Blog')
blog_title_list = association_proxy('blog_list', 'title')
有的時候我們只需要處理對應的一個,比如上面這個,我們指出了blog_list 中的title
#設定與呼叫
session = Session()
user = User(name='xxx')
user.blog_list = [Blog(title='ABC')]
session.add(user)
session.commit()
user = session.query(User).first()
print user.blog_title_list
#如果我們希望像下面這樣更改資料
user = session.query(User).first()
user.blog_title_list = ['NEW']
session.add(user)
session.commit()
#需要在代理設定那個類中新增對應的 __init__ 方法
def __init__(self, title):
self.title = title
#也可以這樣
blog_title_list = association_proxy('blog_list', 'title',creator=lambda t: User(title=t))
查詢方法
class Blog(BaseModel):
__tablename__ = 'blog'
id = Column(Integer, autoincrement=True, primary_key=True)
title = Column(Unicode(32), server_default='')
user = Column(Integer, ForeignKey('user.id'), index=True)
user_obj = relationship('User')
user_name = association_proxy('user_obj', 'name')
查詢:
blog = session.query(Blog).filter(Blog.user_name == u'XX').first()
反過來查詢
user = session.query(User).filter(User.blogs_title.contains('A')).first()