1. 程式人生 > >sqlalchemy oracle 批量新增記

sqlalchemy oracle 批量新增記

sqlalchemy oracle 批量新增記錄

  • 前提:要安裝 cx_oracle

自己封裝了一些常用的操作


import math
from sqlalchemy import orm

class Connect(object):
    instance = None
    @classmethod
    def get_instance(cls, engine):
        if cls.instance:
            return cls.instance
        else:
            obj = cls(
engine) cls.instance = obj return obj def __init__(self, engine): self.engine = engine self.session = None self.connect_session() def connect_session(self): try: self.engine.connect() except Exception as e: print
(e) raise Exception("資料庫連線失敗") dbSession = orm.sessionmaker(bind=self.engine) self.session = dbSession() def add_all(self, records): ''' 指新增記錄 :param records: :return: ''' if len(records) == 0: return post_count =
10000 record_count = len(records) post = math.ceil(record_count / post_count) for i in range(post): begin_index = i * post_count if i + 2 <= post: end_index = (i + 1) * post_count else: end_index = record_count # TODO 暫時先這麼處理 解決不能重複提交的問題 self.session.identity_map._dict = {} self.session.add_all(records[begin_index:end_index]) self.save() def add(self, record): self.session.add(record) def save(self): try: self.session.commit() except Exception as e: print(e) self.session.rollback() # 執行sql語句 def execute_ext(self, sql): return self.session.execute(sql) # 返加是資料 [] def execute(self, sql): try: return self.session.execute(sql).fetchall() except Exception as e: logger.error(e) self.session.rollback() # 返回一條資料元組() def oneexecute(self, sql): try: return self.session.execute(sql).first() except Exception as e: logger.error(e) self.session.rollback() def query(self, query): return self.session.query(query) def close(self): self.session.close()

第一步: 宣告一個engine

engine_db = create_engine("oracle+cx_oracle://admin:[email protected]/orcl", coerce_to_unicode=True, max_overflow=100, pool_size=100, pool_recycle=3600)

第二步: 獲取一個connection

def session_db():
    return Connect(engine_db)

第三步: 匯入SQLAlchemy

from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# 建立物件的基類:
Base = declarative_base()

# 定義User物件:
class User(Base):
    # 表的名字:
    __tablename__ = 'user'

    # 表的結構:
    id = Column(String(20), primary_key=True)
    name = Column(String(20))

第四步:向資料中批量新增資料

# 建立session物件:
session = session_db()
# 建立新User物件,插入陣列:
list_user = []
for i in range(10000):
    new_user = User(id='5', name='Bob')
    list_user.append(new_user)
# 新增到session:
session.add_all(list_user)
# 提交即儲存到資料庫:
session.save()
# 關閉session:
session.close()