1. 程式人生 > >sqlalchemy oracle 批量新增記錄

sqlalchemy oracle 批量新增記錄

  • 前提:要安裝 cx_oracle

自己封裝了一些常用的操作


import math
from sqlalchemy import orm

class Connect(object):
    instance = None
    @classmethod
    def get_instance(cls, engine):
        if cls.instance:
            return cls.instance
        else:
            obj = cls(engine)
            cls.instance = obj
            return
obj def __init__(self, engine): self.engine = engine self.session = None self.connect_session() def connect_session(self): try: self.engine.connect() except Exception as e: print(e) raise Exception("資料庫連線失敗") dbSession =
orm.sessionmaker(bind=self.engine) self.session = dbSession() def add_all(self, records): ''' 指新增記錄 :param records: :return: ''' if len(records) == 0: return post_count = 10000 record_count = len(records) post =
math.ceil(record_count / post_count) for i in range(post): begin_index = i * post_count if i + 2 <= post: end_index = (i + 1) * post_count else: end_index = record_count # TODO 暫時先這麼處理 解決不能重複提交的問題 self.session.identity_map._dict = {} self.session.add_all(records[begin_index:end_index]) self.save() def add(self, record): self.session.add(record) def save(self): try: self.session.commit() except Exception as e: print(e) self.session.rollback() # 執行sql語句 def execute_ext(self, sql): return self.session.execute(sql) # 返加是資料 [] def execute(self, sql): try: return self.session.execute(sql).fetchall() except Exception as e: logger.error(e) self.session.rollback() # 返回一條資料元組() def oneexecute(self, sql): try: return self.session.execute(sql).first() except Exception as e: logger.error(e) self.session.rollback() def query(self, query): return self.session.query(query) def close(self): self.session.close()

宣告一個engine

engine_db = create_engine("oracle+cx_oracle://{}:{}@{}/{}".format(
    SERVER_USER,
    SERVER_PASS,
    SERVER_ADDRESS,
    SERVICE_NAME), coerce_to_unicode=True, max_overflow=100, pool_size=100, pool_recycle=3600)
def session_db():
    return Connect(engine_db)

未完