1. 程式人生 > 資料庫 >基於SQLAlchemy實現操作MySQL並執行原生sql語句

基於SQLAlchemy實現操作MySQL並執行原生sql語句

場景應用

老大我讓爬取內部網站獲取資料,插入到新建的表中,並每天進行爬取更新資料(後面做了定時任務)。然後根據該表統計每日的新增數量/更新數量進行製圖製表,向上級彙報。

思路構建

選用sqlalchemy+mysqlconnector,連線資料庫,建立表,對指定表進行CRUD

from sqlalchemy import exists,Column,Integer,String,ForeignKey,DateTime,Text,func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from conf.parseConfig import parseConf

# 從配置檔案中獲取資料庫資訊
host = parseConf.get_conf('MySQLInfo','host')
port = parseConf.get_conf('MySQLInfo','port')
dbname = parseConf.get_conf('MySQLInfo','dbname')
usernm = parseConf.get_conf('MySQLInfo','usernm')
passwd = parseConf.get_conf('MySQLInfo','passwd')
# 連線資料庫
engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm,passwd,host,port,dbname)
# 建立的資料庫引擎
engine = create_engine(engine_str,encoding='utf-8')

#建立session型別
DBSession = sessionmaker(bind=engine)
# 建立session物件,進行增刪改查:
session = DBSession()

# 例項化官宣模型 - Base 就是 ORM 模型
Base = declarative_base()


# 建立服務單表 繼承Base基類
class ServiceOrder(Base):
  __tablename__ = 'serviceOrderTable'
  serviceOrderId = Column(String(32),primary_key=True,comment='服務單ID')
  serviceDesc = Column(String(512),comment='服務說明')
  transferTimes = Column(String(32),comment='轉派次數')
  # 建立更新時間,對資料的更新進行記錄
  updateTime = Column(DateTime,server_default=func.now(),onupdate=func.now())


def init_db():
  Base.metadata.create_all(engine)


def drop_db():
  Base.metadata.drop_all(engine)


if __name__ == "__main__":
  # 每次執行時 會判斷表的存在性 對於資料庫中不存在的表進行建立 已存在的表則可以直接進行增刪改查
  init_db()

  ### 首先講一下使用sqlalchemy執行原生的sql語句###
  # 方式一:
  res = session.execute('select * from ServiceOrder') # res是獲取的物件
  all_res_list = res.fetchall() # all_res_list具體的結果 是列表
  print(all_res_list ) # 結果: [('資料提取',),('非資料提取',)]
  # 方式二:
  conn = engine.connect()
  res = conn.execute('select * from ServiceOrder') 
  all_res_list = res.fetchall()

  ### 使用建立好的session物件進行增刪改查 ###
  # 插入單條資料
  # 建立新service0rder物件
  new_serviceorder = ServiceOrder(serviceOrderId='001',serviceDesc='ack',transferTimes='9')
  # 新增到session
  session.add(new_serviceorder)
  # 提交即儲存到資料庫
  session.commit()

  # 插入多條資料
  serviceorder_list = [ServiceOrder(serviceOrderId='002',serviceDesc='好的',transferTimes='9'),ServiceOrder(serviceOrderId='003',serviceDesc='起床',transferTimes='9')]
  session.add_all(serviceorder_list)
  session.commit()
  # session.close()

  # 查詢
  # 查詢是否存在 結果是布林值
  it_exists = session.query(
    exists().where(ServiceOrder.serviceOrderId == '002')
  ).scalar()
  # 建立Query查詢,filter是where條件
  # 呼叫one() first()返回唯一行,如果呼叫all()則已列表的形式返回所有行:
  server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == '003').first()
  print(server_order.serviceDesc)
  serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == '好的').all()

  # 改 更新資料
  # 資料更新,將值為Mack的serviceDesc修改為Danny
  update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').update({"serviceDesc": "Danny"})
  # 或則
  update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').first()
  update_objp.serviceDesc = 'Danny'
  session.commit()

  # 刪除
  update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').delete()
  # 或則
  update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').one()
  update_objkp.delete()
  session.commit()
  session.close()

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。