python DBUtils 線程池 連接 Postgresql(多線程公用線程池,DB-API : psycopg2)
阿新 • • 發佈:2019-01-19
work 風險 等待 put pro 連接數 exist eve self.
一、DBUtils
DBUtils 是一套允許線程化 Python 程序可以安全和有效的訪問數據庫的模塊,DBUtils提供兩種外部接口: PersistentDB :提供線程專用的數據庫連接,並自動管理連接。 PooledDB :提供線程間可共享的數據庫連接,並自動管理連接。
操作數據庫模板:
1 import datetime 2 import sys 3 import os 4 import configparser 5 import logging 6 import psycopg2 7 8 from DBUtils.PooledDB importPooledDB 9 10 11 12 13 class DatabaseOperator(object): 14 ‘‘‘ 15 class for database operator 16 ‘‘‘ 17 18 19 def __init__(self, 20 database_config_path, database_config=None): 21 ‘‘‘ 22 Constructor 23 ‘‘‘ 24 self._database_config_path = database_config_path25 26 # load database configuration 27 if not database_config : 28 self._database_config = self.parse_postgresql_config(database_config_path) 29 else: 30 self._database_config = database_config 31 self._pool = None 32 33def database_config_empty(self): 34 if self._database_config: 35 return False 36 else: 37 return True 38 39 def parse_postgresql_config(self, database_config_path=None): 40 ‘‘‘解析pei數據庫配置文件 41 參數 42 --------- 43 arg1 : conf_file 44 數據庫配置文件路徑 45 返回值 46 -------- 47 dict 48 解析配置屬性dict--config 49 50 示例 51 -------- 52 無 53 ‘‘‘ 54 if database_config_path == None and self._database_config_path != None: 55 database_config_path = self._database_config_path 56 if not os.path.isfile(database_config_path): 57 sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path)) 58 parser = configparser.SafeConfigParser() 59 parser.read(database_config_path) 60 config = {} 61 config[‘database‘] = parser.get(‘UniMonDB‘, ‘Database‘) 62 config[‘db_user‘] = parser.get(‘UniMonDB‘, ‘UserName‘) 63 config[‘db_passwd‘] = parser.get(‘UniMonDB‘, ‘Password‘) 64 config[‘db_port‘] = parser.getint(‘UniMonDB‘, ‘Port‘) 65 config[‘db_host‘] = parser.get(‘UniMonDB‘, ‘Servername‘) 66 self._database_config = config 67 68 return config 69 70 71 def get_pool_conn(self): 72 73 if not self._pool: 74 self.init_pgsql_pool() 75 return self._pool.connection() 76 77 def init_pgsql_pool(self): 78 ‘‘‘利用數據庫屬性連接數據庫 79 參數 80 --------- 81 arg1 : config 82 數據庫配置屬性 83 返回值 84 -------- 85 86 示例 87 -------- 88 無 89 ‘‘‘ 90 # 字典config是否為空 91 config = self.parse_postgresql_config() 92 POSTGREIP = config[‘db_host‘] 93 POSTGREPORT = config[‘db_port‘] 94 POSTGREDB = config[‘database‘] 95 POSTGREUSER = config[‘db_user‘] 96 POSTGREPASSWD = config[‘db_passwd‘] 97 try: 98 logging.info(‘Begin to create {0} postgresql pool on:{1}.\n‘.format(POSTGREIP, datetime.datetime.now())) 99 100 pool = PooledDB( 101 creator=psycopg2, # 使用鏈接數據庫的模塊mincached 102 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 103 mincached=1, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 104 maxcached=4, # 鏈接池中最多閑置的鏈接,0和None不限制 105 blocking=True, # 連接池中如果沒有可用連接後,是否阻塞等待。True,等待;False,不等待然後報錯 106 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 107 setsession=[], # 開始會話前執行的命令列表。 108 host=POSTGREIP, 109 port=POSTGREPORT, 110 user=POSTGREUSER, 111 password=POSTGREPASSWD, 112 database=POSTGREDB) 113 self._pool = pool 114 logging.info(‘SUCCESS: create postgresql success.\n‘) 115 116 except Exception as e: 117 logging.error(‘ERROR: create postgresql pool failed:{0}\n‘) 118 self.close_db_cursor() 119 sys.exit(‘ERROR: create postgresql pool error caused by {0}‘.format(str(e))) 120 121 122 def pg_select_operator(self, sql): 123 ‘‘‘進行查詢操作,函數返回前關閉cursor,conn 124 參數 125 --------- 126 arg1 : sql查詢語句 127 返回值 128 -------- 129 list:result 130 類型為list的查詢結果:result 131 132 示例 133 -------- 134 無 135 ‘‘‘ 136 # 執行查詢 137 try: 138 conn = self.get_pool_conn() 139 cursor = conn.cursor() 140 cursor.execute(sql) 141 result = cursor.fetchall() 142 except Exception as e: 143 logging.error(‘ERROR: execute {0} causes error‘.format(sql)) 144 sys.exit(‘ERROR: load data from database error caused {0}‘.format(str(e))) 145 finally: 146 cursor.close() 147 conn.close() 148 return result 149 150 def test_pool_con(self): 151 sql = ‘select * from tbl_devprofile‘ 152 result = self.pg_select_operator(sql) 153 print(result) 154 155 def pg_insert_operator(self, sql): 156 157 result = False 158 try: 159 conn = self.get_pool_conn() 160 cursor = conn.cursor() 161 cursor.execute(sql) 162 result = True 163 except Exception as e: 164 logging.error(‘ERROR: execute {0} causes error‘.format(sql)) 165 sys.exit(‘ERROR: insert data from database error caused {0}‘.format(str(e))) 166 finally: 167 cursor.close() 168 conn.commit() 169 conn.close() 170 return result 171 172 def pg_update_operator(self, sql): 173 174 result = False 175 try: 176 conn = self.get_pool_conn() 177 cursor = conn.cursor() 178 cursor.execute(sql) 179 result = True 180 except Exception as e: 181 logging.error(‘ERROR: execute {0} causes error‘.format(sql)) 182 sys.exit(‘ERROR: update data from database error caused {0}‘.format(str(e))) 183 finally: 184 cursor.close() 185 conn.commit() 186 conn.close() 187 return result 188 189 def pg_delete_operator(self, sql): 190 result = False 191 # 執行查詢 192 try: 193 conn = self.get_pool_conn() 194 cursor = conn.cursor() 195 cursor.execute(sql) 196 result = True 197 except Exception as e: 198 logging.error(‘ERROR: execute {0} causes error‘.format(sql)) 199 sys.exit(‘ERROR: delete data from database error caused {0}‘.format(str(e))) 200 finally: 201 cursor.close() 202 conn.commit() 203 conn.close() 204 return result 205 206 207 def close_pool(self): 208 ‘‘‘關閉pool 209 參數 210 --------- 211 無 212 213 返回值 214 -------- 215 無 216 示例 217 -------- 218 無 219 ‘‘‘ 220 if self._pool != None: 221 self._pool.close() 222 223 if __name__ == ‘__main__‘: 224 path = "E:\\Users\\Administrator\\eclipse-workspace\\com.leagsoft.basemodule\\base\\config\\sql_conf.conf" 225 db = DatabaseOperator( 226 database_config_path=path) 227 db.test_pool_con()
二、多線程
原理:創建多個線程類,多個線程類共享一個隊裏Queue,每一個線程類可以操作數據庫
1 from threading import Thread 2 3 class Worker(Thread): 4 def __init__(self, queue): 5 Thread.__init__(self) 6 self.queue = queue 7 8 def run(self): 9 while True: 10 # Get the work from the queue and expand the tuple 11 # 從隊列中獲取任務 12 database_operator, device, stand_alone_result = self.queue.get() 13 operateResult(database_operator, device, stand_alone_result) 14 # 任務執行完之後要通知隊列 15 self.queue.task_done()
填充隊列
1 # 使用隊列多線程 2 logging.info(‘begin to update all device risk score by multi_processing.\n‘) 3 from queue import Queue 4 queue = Queue() 5 # 六個線程,每個線程共享一個隊列 6 for _ in range(6): 7 worker = Worker(queue) 8 worker.setDaemon(True) 9 worker.start() 10 11 for record in all_devid: 12 device = record[0] 13 devtype = record[1] 14 all_countlist = all_dict.get(device) 15 stand_alone_result = device_assess(all_countlist) 16 if (devtype in (server_devtype + computer_devtype)) and (stand_alone_result < 100): 17 stand_alone_result *= 0.8 18 # 將設備風險評分數據保存到數據庫中 19 queue.put((database_operator, device, stand_alone_result)) 20 21 #等待隊列任務執行完 22 queue.join() 23 24 25 def operateResult(database_operator, device, stand_alone_result): 26 ‘‘‘ 27 函數名稱: device_assess 28 描述: 保存單臺設備分數到數據庫 29 調用: 無 30 被調用: main 31 被訪問的表: tbl_devprofile 32 被修改的表: 無 33 輸入參數: database_operator, device:設備uid, stand_alone_result:單臺設備風險分數 34 輸出參數:無 35 返回值: 單臺設備風險分數值 36 其它: 無 37 ‘‘‘ 38 import time 39 find_profile_sql = "SELECT uiddevrecordid FROM tbl_devprofile WHERE uiddevrecordid=‘{0}‘;".format(device) 40 isExistRecord = database_operator.pg_select_operator(find_profile_sql) 41 #currentTime=datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘) 42 currentTime=time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time())) 43 if len(isExistRecord) > 0: 44 updata_profile_sql = "UPDATE tbl_devprofile SET irisklevel={0}, dtrisktime=‘{1}‘ 45 WHERE uiddevrecordid=‘{2}‘;".format(stand_alone_result, currentTime, device) 46 database_operator.pg_update_operator(updata_profile_sql) 47 else: 48 insert_profile_sql = "INSERT INTO tbl_devprofile VALUES(‘{0}‘,NULL,NULL,NULL,NULL,NULL,NULL,NULL,{1},‘{2}‘);".format( 49 device, stand_alone_result, currentTime) 50 database_operator.pg_insert_operator(insert_profile_sql)
使用單線程時,執行完代碼花費20s左右,使用多線程時花費5s左右。
Reference:
[1] https://blog.csdn.net/zhaihaifei/article/details/54016939
[2] https://www.cnblogs.com/hao-ming/p/7215050.html?utm_source=itdadao&utm_medium=referral
[3] https://www.cnblogs.com/wozijisun/p/6160065.html (多線程)
[4] http://www.lpfrx.com/archives/4431/
[5] https://www.cnblogs.com/95lyj/p/9047554.html
python DBUtils 線程池 連接 Postgresql(多線程公用線程池,DB-API : psycopg2)