Python連線池工具-DBUtils
阿新 • • 發佈:2021-06-26
使用連線池意義
減少頻繁和資料庫建立連線從而耗費資源
pip install DBUtils 安裝
第一步:建立資料庫配置檔案db_config.py
import MySQLdb # 資料庫資訊 DB_TEST_HOST = "192.168.4.96" DB_TEST_PORT = 3306 DB_TEST_DBNAME = "restframework" DB_TEST_USER = "root" DB_TEST_PASSWORD = "110396" # creator:資料庫驅動模組,如常見的pymysql,pymssql,cx_Oracle模組。無預設值 # mincached:初始化連線池時建立的連線數。預設為0,即初始化時不建立連線。(建議預設0,假如非0的話,在某些資料庫不可用時,整個專案會啟動不了) # maxcached:池中空閒連線的最大數量。預設為0,即無最大數量限制。(建議預設) # maxshared:池中共享連線的最大數量。預設為0,即每個連線都是專用的,不可共享(不常用,建議預設) # maxconnections:被允許的最大連線數。預設為0,無最大數量限制。(視情況而定) # blocking:連線數達到最大時,新連線是否可阻塞。預設False,即達到最大連線數時,再取新連線將會報錯。(建議True,達到最大連線數時,新連線阻塞,等待連線數減少再連線) # maxusage:連線的最大使用次數。預設0,即無使用次數限制。(建議預設) # setsession:可選的SQL命令列表,可用於準備會話。(例如設定時區) # reset:當連線返回到池中時,重置連線的方式。預設True,總是執行回滾。(不太清楚,建議預設) # ping:確定何時使用ping()檢查連線。預設1,即當連線被取走,做一次ping操作。0是從不ping,1是預設,2是當該連線建立遊標時ping,4是執行sql語句時ping,7是總是ping # 資料庫連線編碼 DB_CHARSET = "utf8" # 開始時的閒置連線數量 DB_MINCACHED = 0 DB_MAXCACHED = 0 DB_MAX_SHARED = 5 # 最大連線數 DB_MAX_CONNECTIONS = 100 DB_BLOCKING = True DB_CREATOR = MySQLdb
第二步:資料庫連線池初始化類DbConnector
from dbutils.pooled_db import PooledDB import extensions.dataBaseConfig as dbConfig """ 建立資料庫連線池 """ import logging logger = logging.getLogger("django") class MyDbConnect: __pool = None def __init__(self): self.conn = self.__getConn() self.cursor = self.conn.cursor() def __getConn(self): """ :return: 返回連線物件 """ if self.__pool is None: self.__pool = PooledDB( creator=dbConfig.DB_CREATOR, blocking=dbConfig.DB_BLOCKING, host=dbConfig.DB_TEST_HOST, port=dbConfig.DB_TEST_PORT, user=dbConfig.DB_TEST_USER, password=dbConfig.DB_TEST_PASSWORD, db=dbConfig.DB_TEST_DBNAME, use_unicode=False, charset=dbConfig.DB_CHARSET ) return self.__pool.connection() # 釋放連線池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() def close(self): self.cursor.close() self.conn.close()
第三步:根據自己的需要建立具體的資料庫使用類
我這裡只使用了多條插入的函式
class MySqlHelper: def __init__(self): self.db = MyDbConnect() self.cursor = self.db.cursor self.conn = self.db.conn def __new__(cls, *args, **kwargs): if not hasattr(cls, 'inst'): cls.inst = super(MySqlHelper, cls).__new__(cls, *args, **kwargs) return cls.inst def insert_many(self,sql,param=None): try: self.db.cursor.executemany(sql, param) self.cursor.executemany(sql, param) self.conn.commit() self.close() return True except Exception as e: logger.error("msg:{}".format(e)) self.conn.rollback() self.close() return False def execute(self,sql,param=None): count = self.db.cursor.execute(sql) self.close() return count def close(self): self.db.close()