python使用DBUtils連線部分主流資料庫
需要了解的知識
首先呢,你需要了解下DBUtils,我的描述肯定沒官網解釋來的清晰,自行閱讀後,你就會發現我為什麼會選用PooledDB而不是其他作為連線池了。
其次,DBUtils支援所有遵循DP-API 2規範的資料庫連線模組,也就是說除了我示例中所提供的幾個資料庫連線方式外,各位可以探索其他遵循此標準的連線模組,從而在此基礎上拓展,成為連線更多種類資料庫的通用工具類。
最後,以下內容均基於python3。
---------------------------------------------------------
10.23增補內容:支援hbase,更新字典返回方式以及部分方法擴充套件。
準備工作
首先,pip下支援DB-API 2規範的相關資料庫連線模組和DBUtils。
pip install DBUtils
pip install pymysql(mysql)
pip install pymssql(sqlserver)
pip install cx_Oracle(oracle)
pip install phoenixdb(hbase)
pip install sqlite3(sqlite3 python自帶)
其次,需要準備一份配置檔案,姑且命名為pdbc.properties,以下是示例,根據資料庫的連線資訊進行修改即可。
# 資料庫型別,支援mysql,oracle,sqlserver,sqlite3,hbase # -------------------------------------------------------------- # mysql # 連線資料庫host host_mysql=ip # 連線資料庫port port_mysql=3306 # 連線資料庫庫名 database_mysql=dbname # 使用者 user_mysql=username # 密碼 password_mysql=password # 字符集 charset_mysql=utf8 # -------------------------------------------------------------- # oracle # 連線資料庫host host_orc=ip # 連線資料庫port port_orc=1521 # 連線資料庫庫名 database_orc=dbname # 使用者 user_orc=username # 密碼 password_orc=password # 字符集 nencoding_orc=utf8 # -------------------------------------------------------------- # sqlserver # 連線資料庫host host_ms=ip # 連線資料庫port port_ms=1433 # 連線資料庫庫名 database_ms=dbname # 使用者 user_ms=username # 密碼 password_ms=password # 字符集 charset_ms=utf8 # -------------------------------------------------------------- # sqlite3 # 連線資料庫檔名,sqlite不支援加密,不使用使用者名稱和密碼 database_sqlite3=path/to/your/dbname.db # -------------------------------------------------------------- # hbase # 連線資料庫host host_hb=ip # 連線資料庫port,phoenixdb連線使用8765埠而非2181等其他埠 port_hb=8765 # 使用者 user_hb=username # 密碼 password_hb=password
然後,準備一份讀取properties檔案的工具類,姑且稱為PropertiesUtil.py,可以尋找網上的,也可以參考我寫的。
# -*- coding:utf-8 -*- class PropertiesUtil(object): # 快取配置 __file_dict = {} def get_config_dict(self, file_path="pdbc.properties"): """ 獲取資原始檔,形成字典 :param file_path: 檔案路徑 :return:字典內容的key、value均為字串 """ if file_path not in self.__file_dict: properties = {} with open(file_path, 'r', encoding='UTF-8') as pro_file: for line in pro_file.readlines(): line = line.strip().replace('\n', '') if line.find('=') > 0 and not line.startswith('#'): strs = line.split('=') value = line[len(strs[0]) + 1:] self.__get_dict(strs[0].strip(), properties, value.strip()) self.__file_dict[file_path] = properties return self.__file_dict[file_path] def get_config_value(self, file_path, prop_name): """ 獲取資原始檔,形成字典,獲取屬性值 :param file_path: 檔案路徑 :param prop_name: 屬性名稱 :return: 返回字串格式的屬性值 """ return self.get_config_dict(file_path)[prop_name] def __get_dict(self, dict_name, properties, value): """ 遞迴獲取配置字典 :param dict_name:鍵 :param properties: 字典 :param value: 值 :return: """ if dict_name.find('.') > 0: key = dict_name.split('.')[0] properties.setdefault(key, {}) self.__get_dict(dict_name[len(key) + 1:], properties[key], value) else: properties[dict_name] = value # 獲取例項,保持單例 prop = PropertiesUtil() if __name__ == "__main__": # 呼叫方式,獲取例項 # from util.ConfigUtil import prop print(prop.get_config_dict("pdbc.properties")) print(prop.get_config_value("pdbc.properties", "dbtype"))
大概是重點來了
有幾點要先提一下。
1.我這裡僅提供增刪改查基本功能,其他諸如儲存過程、函式等內容我自己也在探索中,故不列出。
2.使用importlib來實現動態載入,因為我不太喜歡開始就匯入所有需要的模組,畢竟連線池不是用來同時連線所有型別資料庫的。
3.PooledDB和建立連線時的config,我僅羅列了幾項基本引數,更多的煩請自行查詢資料。(這點是真的不好意思,因為本人只熟悉mysql相關內容,而且還很懶~~)
4.mysql和mssql語句的引數使用%s作為佔位符,oracle和sqlite使用:數字作為佔位符,sqllite還可以用?作為佔位符,詳情可以見程式碼中main函式示例。
5.測試用的表名為TEST2,有兩個欄位,id 主鍵 數字型別,name 字串型別。注意sqlserver的字串請使用nvarchar型別,不然返回結果可能會亂碼。(至於為什麼不給建表語句的原因,算了,不編了,就是懶~~)
6. hbase插入語句的引數使用:數字或者?作為佔位符,hbase的INSERT請使用UPSERT替換。且hbase中'autocommit': True配置一定要,否則插入刪除語句執行無效。
囉嗦了這麼多,下面上程式碼。
# -*- coding:utf-8 -*-
"""
Description: DB工具類
@author: WangLeAi
@date: 2018/9/18
"""
from util.PropertiesUtil import prop
from DBUtils.PooledDB import PooledDB
import importlib
class DbPoolUtil(object):
def __init__(self, config_file='config/pdbc.properties', db_type='mysql'):
"""
初始化
:param config_file: 配置檔案地址
:param db_type: 資料庫型別,支援 mysql, oracle, sqlserver, sqlite, hbase
"""
properties_dic = prop.get_config_dict(config_file)
self.__db_type = db_type
if self.__db_type == "mysql":
config = {
'host': properties_dic['host_mysql'],
'port': int(properties_dic['port_mysql']),
'database': properties_dic['database_mysql'],
'user': properties_dic['user_mysql'],
'password': properties_dic['password_mysql'],
'charset': properties_dic['charset_mysql']
}
db_creator = importlib.import_module("pymysql")
self.__pool = PooledDB(db_creator, maxcached=50, maxconnections=1000, maxusage=1000, **config)
elif self.__db_type == "oracle":
config = {
'user': properties_dic['user_orc'],
'password': properties_dic['password_orc'],
'dsn': "/".join(
[":".join([properties_dic['host_orc'], properties_dic['port_orc']]),
properties_dic['database_orc']]),
'nencoding': properties_dic['nencoding_orc']
}
db_creator = importlib.import_module("cx_Oracle")
self.__pool = PooledDB(db_creator, maxcached=50, maxconnections=1000, maxusage=1000, **config)
elif self.__db_type == "sqlserver":
config = {
'host': properties_dic['host_ms'],
'port': int(properties_dic['port_ms']),
'database': properties_dic['database_ms'],
'user': properties_dic['user_ms'],
'password': properties_dic['password_ms'],
'charset': properties_dic['charset_ms']
}
db_creator = importlib.import_module("pymssql")
self.__pool = PooledDB(db_creator, maxcached=50, maxconnections=1000, maxusage=1000, **config)
elif self.__db_type == "sqlite":
config = {
'database': properties_dic['database_sqlite3']
}
db_creator = importlib.import_module("sqlite3")
self.__pool = PooledDB(db_creator, maxcached=50, maxconnections=1000, maxusage=1000, **config)
elif self.__db_type == "hbase":
# 'autocommit': True配置一定要,否則插入刪除語句執行無效
config = {
'url': 'http://{0}:{1}'.format(properties_dic['host_hb'], properties_dic['port_hb']),
'user': properties_dic['user_hb'],
'password': properties_dic['password_hb'],
'autocommit': True
}
db_creator = importlib.import_module("phoenixdb")
self.__pool = PooledDB(db_creator, maxcached=50, maxconnections=1000, maxusage=1000, **config)
else:
raise Exception("unsupported database type " + self.__db_type)
def execute_query(self, sql, dict_mark=False, args=()):
"""
執行查詢語句,獲取結果
:param sql:sql語句,注意防注入
:param dict_mark:是否以字典形式返回,預設為False
:param args:傳入引數
:return:結果集
"""
result = []
conn = self.__pool.connection()
cur = conn.cursor()
try:
if dict_mark:
cur.execute(sql, args)
# name為description的第一個內容,表示為欄位名
fields = [desc[0] for desc in cur.description]
rst = cur.fetchall()
if rst:
result = [dict(zip(fields, row)) for row in rst]
else:
cur.execute(sql, args)
result = cur.fetchall()
except Exception as e:
print('異常資訊:' + str(e))
cur.close()
conn.close()
return result
def execute_query_single(self, sql, dict_mark=False, args=()):
"""
執行查詢語句,獲取單行結果
:param sql:sql語句,注意防注入
:param dict_mark:是否以字典形式返回,預設為False
:param args:傳入引數
:return:結果集
"""
result = []
conn = self.__pool.connection()
cur = conn.cursor()
try:
if dict_mark:
cur.execute(sql, args)
# name為description的第一個內容,表示為欄位名
fields = [desc[0] for desc in cur.description]
rst = cur.fetchone()
if rst:
result = dict(zip(fields, rst))
else:
cur.execute(sql, args)
result = cur.fetchone()
except Exception as e:
print('異常資訊:' + str(e))
cur.close()
conn.close()
return result
def execute_iud(self, sql, args=()):
"""
執行增刪改語句
:param sql:sql語句,注意防注入
:param args:傳入引數
:return:影響行數,mysql和sqlite有返回值
"""
conn = self.__pool.connection()
cur = conn.cursor()
count = 0
try:
result = cur.execute(sql, args)
conn.commit()
if self.__db_type == "mysql":
count = result
if self.__db_type == "sqlite3":
count = result.rowcount
except Exception as e:
print('異常資訊:' + str(e))
conn.rollback()
cur.close()
conn.close()
return count
def execute_many_iud(self, sql, args):
"""
批量執行增刪改語句
:param sql:sql語句,注意防注入
:param args:引數,內部元組或列表大小與sql語句中引數數量一致
:return:影響行數,mysql和sqlite有返回值
"""
conn = self.__pool.connection()
cur = conn.cursor()
count = 0
loopK = 5000
try:
k = len(args)
if k > loopK:
n = k // loopK
for i in range(n):
arg = args[(i * loopK): ((i + 1) * loopK)]
cur.executemany(sql, arg)
conn.commit()
arg = args[(n * loopK):]
if len(arg) > 0:
cur.executemany(sql, arg)
conn.commit()
else:
result = cur.executemany(sql, args)
conn.commit()
if self.__db_type == "mysql":
count = result
if self.__db_type == "sqlite3":
count = result.rowcount
except Exception as e:
print('異常資訊:' + str(e))
conn.rollback()
cur.close()
conn.close()
return count
def execute_proc(self, proc_name, args=()):
"""
執行儲存過程,mysql適用
:param proc_name:儲存過程/函式名
:param args:引數
:return:result為結果集,args_out為引數最終結果(用於out,順序與傳參一致)
"""
result = ()
args_out = ()
conn = self.__pool.connection()
cur = conn.cursor()
try:
cur.callproc(proc_name, args)
result = cur.fetchall()
if args:
sql = "select " + ",".join(["_".join(["@", proc_name, str(index)]) for index in range(len(args))])
cur.execute(sql)
args_out = cur.fetchone()
conn.commit()
except Exception as e:
print('異常資訊:' + str(e))
conn.rollback()
cur.close()
conn.close()
return result, args_out
def loop_row(self, obj, fun_name, sql, args=()):
"""
執行查詢語句,並且對遊標每行結果反射呼叫某個處理方法
主要是考慮一些表記錄太大時,不能一次性取出,遊標式取資料
:param obj: 物件或者模組
:param fun_name:呼叫方法名
:param sql:sql語句,注意防注入
:param args:傳入引數
:return:
"""
conn = self.__pool.connection()
cur = conn.cursor()
try:
cur.execute(sql, args)
fun = getattr(obj, fun_name)
while True:
row = cur.fetchone()
if row is None:
break
fun(row)
except Exception as e:
print('異常資訊:' + str(e))
cur.close()
conn.close()
def loop_row_custom(self, sql, args=()):
"""
執行查詢語句,並且對遊標每行結果執行某些操作或者直接返回生成器
主要是考慮一些表記錄太大時,不能一次性取出,遊標式取資料
:param sql:sql語句,注意防注入
:param args:傳入引數
:return:
"""
conn = self.__pool.connection()
cur = conn.cursor()
try:
cur.execute(sql, args)
while True:
row = cur.fetchone()
if row is None:
break
# 在此編寫你想做的操作
print(row)
except Exception as e:
print('異常資訊:' + str(e))
cur.close()
conn.close()
# if __name__ == "__main__":
# 使用demo,工作目錄在專案目錄的前提下,使用表為TEST2表
# dbpool_util = DbPoolUtil(db_type="mysql")
# sql1 = """DELETE FROM TEST2"""
# result1 = dbpool_util.execute_iud(sql1)
# print(result1)
# mysql和mssql語句的引數使用%s作為佔位符,oracle和sqlite使用:數字作為佔位符(sqllite還可以用?作為佔位符)
# hbase插入語句的引數使用:數字或者?作為佔位符,hbase的INSERT請使用UPSERT替換
# sql2 = """INSERT INTO TEST2(id,name) VALUES (%s,%s)"""
# sql2 = """INSERT INTO TEST2(id,name) VALUES (:1,:2)"""
# sql2 = """UPSERT INTO TEST2(id,name) VALUES (?,?)"""
# test_args2 = [(1, '王'), (2, '葬愛'), (3, 'shao'), (5, 'nian'), (8, 'wang')]
# result2 = dbpool_util.execute_many_iud(sql2, test_args2)
# print(result2)
# sql3 = """SELECT id as wangleai,name as zangai FROM TEST2 """
# result3 = dbpool_util.execute_query(sql3)
# print(result3)
# result3 = dbpool_util.execute_query_single(sql3)
# print(result3)
# result3 = dbpool_util.execute_query(sql3, dict_mark=True)
# print(result3)
# result3 = dbpool_util.execute_query_single(sql3, dict_mark=True)
# print(result3)
# dbpool_util.loop_row_custom(sql3)
# 此處反射呼叫相關方法,檔案就不給了,嫌麻煩
# from util.ClassTest import ClsTest
# cla_test = ClsTest()
# dbpool_util.loop_row(cla_test, "print_row", sql3)
#
# import util.ModuleTest as mod_test
#
# dbpool_util.loop_row(mod_test, "print_row", sql3)
# sql4 = """SELECT id,name FROM TEST2 where id = %s"""
# sql4 = """SELECT id,name FROM TEST2 where id = :1"""
# test_args4 = (3,)
# result4 = dbpool_util.execute_query(sql4, args=test_args4)
# print(result4)
以上,目前支援mysql,oracle,sqlserver,sqlite3和hbase。
後記
啊,好久沒寫長的了,之前都是短、短、短,偶爾也……算了,還是短舒服~~