使用類的繼承實現多個MySQL連線池以及相關操作說明
阿新 • • 發佈:2020-10-10
承接上文
本文是在上一篇文章內容的基礎上的拓展:MySQL連線池DBUtils與執行緒池ThreadPoolExecutor的結合使用例項
需求拓展說明
上一篇文章實現的是一個數據庫db的連線池的操作,但是在實際的業務場景中我們很可能在同一段業務程式碼中使用多個數據庫db,如果只實現了一個數據庫連線池,那麼其他的db還是存在多次建立連結及拆除連結損耗的問題。
筆者在上文的基礎上做了一下優化拓展,可以在同一段邏輯程式碼中使用多個數據庫db的連線池。
實現說明
完整的程式碼建議大家從上面的文章中找出來並親自實現一下,這裡只描述核心的問題。
建立2個測試db
這裡我建立了2個測試的db,作為2個連線池使用:
連線池的配置
# -*- coding:utf-8 -*- # MySQL連線池的配置 MySQLS = { 'students': { "maxconnections": 0, # 連線池允許的最大連線數,0和None表示不限制連線數 "mincached": 2, # 初始化時,連結池中至少建立的空閒的連結,0表示不建立 "maxcached": 0, # 連結池中最多閒置的連結,0和None不限制 "maxusage": 1, # 一個連結最多被重複使用的次數,None表示無限制 "blocking": True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯 'host': '127.0.0.1', 'user': 'root', 'password': '123', 'db': 'students', 'port': 3306, 'charset': 'utf8', }, 'students1': { "maxconnections": 0, # 連線池允許的最大連線數,0和None表示不限制連線數 "mincached": 2, # 初始化時,連結池中至少建立的空閒的連結,0表示不建立 "maxcached": 0, # 連結池中最多閒置的連結,0和None不限制 "maxusage": 1, # 一個連結最多被重複使用的次數,None表示無限制 "blocking": True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯 'host': '127.0.0.1', 'user': 'root', 'password': '123', 'db': 'students1', 'port': 3306, 'charset': 'utf8', }, }
db_conn.py中多個連線池的實現
# -*- coding:utf-8 -*- import pymysql import logging import traceback from config import MySQLS from DBUtils.PooledDB import PooledDB # 獲取MySQL連線池的配置 # students資料庫連線池的配置 students_pool_config = MySQLS["students"] # students1資料庫連線池的配置 students1_pool_config = MySQLS["students1"] ### 多個MySQL連線池的實現 # 基類 class MySQLPoolBase(object): # with上下文 def __enter__(self): self.conn = self.pool.connection() self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) return self def __exit__(self, exc_type, exc_val, exc_tb): # 關閉連線池 self.cursor.close() self.conn.close() # 插入或修改操作 def insert_or_update(self, sql): try: self.cursor.execute(sql) self.conn.commit() return 1 except Exception as error: print(traceback.format_exc()) # 回滾 self.conn.rollback() # 簡單的日誌處理 logging.error("=======ERROR=======\n%s\nsql:%s" % (error, sql)) raise # 查詢操作 def query(self, sql): try: self.cursor.execute(sql) results = self.cursor.fetchall() return results except Exception as error: # 簡單的日誌處理 logging.error("=======ERROR=======:\n%s\nsql:%s" % (error, sql)) raise # 子類1 students class StudentsPool(MySQLPoolBase): # 類變數 pool = PooledDB(creator=pymysql,**students_pool_config) # 子類2 students1 class Students1Pool(MySQLPoolBase): # 類變數 pool = PooledDB(pymysql,**students1_pool_config)
在同一段業務程式碼中使用多個連線池
這裡使用了with語句支援的簡潔的方法:
def change_db(class_id): # 在這裡面使用MySQL的連線池 # 多個連線池一起使用! with StudentsPool() as students_pool, Students1Pool() as s1_pool: # 這裡每個物件使用的連線池都與類中的連線池一樣!說明所有的執行緒池使用的是同一個MySQL的連線池! print("students_pool_id>>>",id(students_pool.pool)) print("s1_pool_id>>>>>",id(s1_pool.pool))
執行緒池的使用
# -*- coding:utf-8 -*- import time from multiprocessing import Pool from concurrent.futures import ThreadPoolExecutor,wait,as_completed from utils import change_db ### 主函式 def main_func(): # class_id_lst = ["01","02","03","04","05","12","13","14","15"] class_id_lst = ["01","02","03","04","05"] start = time.time() ## 2、執行緒池 + MySQL連線池 executor = ThreadPoolExecutor(max_workers=3) # class_id 是引數 all_tasks = [executor.submit(change_db,class_id) for class_id in class_id_lst] ## 獲取返回值 ———— 本例不用獲取返回值 for future in as_completed(all_tasks): data = future.result() print("執行緒池耗時:{}".format(time.time() - start)) if __name__ == '__main__': main_func()
看一下業務程式碼中列印的類物件中連線池的id
我們可以看到:相同連線池類例項化物件中使用的是同一個pool(這就是pool必須是類變數的原因!),驗證完畢。
students_pool_id>>> 4506440088 s1_pool_id>>>>> 4509501032 students_pool_id>>> 4506440088 s1_pool_id>>>>> 4509501032 students_pool_id>>> 4506440088 s1_pool_id>>>>> 4509501032 students_pool_id>>> 4506440088 s1_pool_id>>>>> 4509501032 students_pool_id>>> 4506440088 s1_pool_id>>>>> 4509501032
~~~