sqlalchemy oracle 批量新增記
阿新 • • 發佈:2018-12-14
sqlalchemy oracle 批量新增記錄
- 前提:要安裝 cx_oracle
自己封裝了一些常用的操作
import math
from sqlalchemy import orm
class Connect(object):
instance = None
@classmethod
def get_instance(cls, engine):
if cls.instance:
return cls.instance
else:
obj = cls( engine)
cls.instance = obj
return obj
def __init__(self, engine):
self.engine = engine
self.session = None
self.connect_session()
def connect_session(self):
try:
self.engine.connect()
except Exception as e:
print (e)
raise Exception("資料庫連線失敗")
dbSession = orm.sessionmaker(bind=self.engine)
self.session = dbSession()
def add_all(self, records):
'''
指新增記錄
:param records:
:return:
'''
if len(records) == 0:
return
post_count = 10000
record_count = len(records)
post = math.ceil(record_count / post_count)
for i in range(post):
begin_index = i * post_count
if i + 2 <= post:
end_index = (i + 1) * post_count
else:
end_index = record_count
# TODO 暫時先這麼處理 解決不能重複提交的問題
self.session.identity_map._dict = {}
self.session.add_all(records[begin_index:end_index])
self.save()
def add(self, record):
self.session.add(record)
def save(self):
try:
self.session.commit()
except Exception as e:
print(e)
self.session.rollback()
# 執行sql語句
def execute_ext(self, sql):
return self.session.execute(sql)
# 返加是資料 []
def execute(self, sql):
try:
return self.session.execute(sql).fetchall()
except Exception as e:
logger.error(e)
self.session.rollback()
# 返回一條資料元組()
def oneexecute(self, sql):
try:
return self.session.execute(sql).first()
except Exception as e:
logger.error(e)
self.session.rollback()
def query(self, query):
return self.session.query(query)
def close(self):
self.session.close()
第一步: 宣告一個engine
engine_db = create_engine("oracle+cx_oracle://admin:[email protected]/orcl", coerce_to_unicode=True, max_overflow=100, pool_size=100, pool_recycle=3600)
第二步: 獲取一個connection
def session_db():
return Connect(engine_db)
第三步: 匯入SQLAlchemy
from sqlalchemy import Column, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# 建立物件的基類:
Base = declarative_base()
# 定義User物件:
class User(Base):
# 表的名字:
__tablename__ = 'user'
# 表的結構:
id = Column(String(20), primary_key=True)
name = Column(String(20))
第四步:向資料中批量新增資料
# 建立session物件:
session = session_db()
# 建立新User物件,插入陣列:
list_user = []
for i in range(10000):
new_user = User(id='5', name='Bob')
list_user.append(new_user)
# 新增到session:
session.add_all(list_user)
# 提交即儲存到資料庫:
session.save()
# 關閉session:
session.close()