基於SQLAlchemy實現操作MySQL並執行原生sql語句
阿新 • • 發佈:2020-06-26
場景應用
老大我讓爬取內部網站獲取資料,插入到新建的表中,並每天進行爬取更新資料(後面做了定時任務)。然後根據該表統計每日的新增數量/更新數量進行製圖製表,向上級彙報。
思路構建
選用sqlalchemy+mysqlconnector,連線資料庫,建立表,對指定表進行CRUD
from sqlalchemy import exists,Column,Integer,String,ForeignKey,DateTime,Text,func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from conf.parseConfig import parseConf # 從配置檔案中獲取資料庫資訊 host = parseConf.get_conf('MySQLInfo','host') port = parseConf.get_conf('MySQLInfo','port') dbname = parseConf.get_conf('MySQLInfo','dbname') usernm = parseConf.get_conf('MySQLInfo','usernm') passwd = parseConf.get_conf('MySQLInfo','passwd') # 連線資料庫 engine_str = "mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm,passwd,host,port,dbname) # 建立的資料庫引擎 engine = create_engine(engine_str,encoding='utf-8') #建立session型別 DBSession = sessionmaker(bind=engine) # 建立session物件,進行增刪改查: session = DBSession() # 例項化官宣模型 - Base 就是 ORM 模型 Base = declarative_base() # 建立服務單表 繼承Base基類 class ServiceOrder(Base): __tablename__ = 'serviceOrderTable' serviceOrderId = Column(String(32),primary_key=True,comment='服務單ID') serviceDesc = Column(String(512),comment='服務說明') transferTimes = Column(String(32),comment='轉派次數') # 建立更新時間,對資料的更新進行記錄 updateTime = Column(DateTime,server_default=func.now(),onupdate=func.now()) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) if __name__ == "__main__": # 每次執行時 會判斷表的存在性 對於資料庫中不存在的表進行建立 已存在的表則可以直接進行增刪改查 init_db() ### 首先講一下使用sqlalchemy執行原生的sql語句### # 方式一: res = session.execute('select * from ServiceOrder') # res是獲取的物件 all_res_list = res.fetchall() # all_res_list具體的結果 是列表 print(all_res_list ) # 結果: [('資料提取',),('非資料提取',)] # 方式二: conn = engine.connect() res = conn.execute('select * from ServiceOrder') all_res_list = res.fetchall() ### 使用建立好的session物件進行增刪改查 ### # 插入單條資料 # 建立新service0rder物件 new_serviceorder = ServiceOrder(serviceOrderId='001',serviceDesc='ack',transferTimes='9') # 新增到session session.add(new_serviceorder) # 提交即儲存到資料庫 session.commit() # 插入多條資料 serviceorder_list = [ServiceOrder(serviceOrderId='002',serviceDesc='好的',transferTimes='9'),ServiceOrder(serviceOrderId='003',serviceDesc='起床',transferTimes='9')] session.add_all(serviceorder_list) session.commit() # session.close() # 查詢 # 查詢是否存在 結果是布林值 it_exists = session.query( exists().where(ServiceOrder.serviceOrderId == '002') ).scalar() # 建立Query查詢,filter是where條件 # 呼叫one() first()返回唯一行,如果呼叫all()則已列表的形式返回所有行: server_order = session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == '003').first() print(server_order.serviceDesc) serciceorders = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == '好的').all() # 改 更新資料 # 資料更新,將值為Mack的serviceDesc修改為Danny update_obj = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').update({"serviceDesc": "Danny"}) # 或則 update_objp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').first() update_objp.serviceDesc = 'Danny' session.commit() # 刪除 update_objk = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').delete() # 或則 update_objkp = session.query(ServiceOrder).filter(ServiceOrder.serviceDesc == 'Mack').one() update_objkp.delete() session.commit() session.close()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。