1. 程式人生 > 其它 >python 連線 mysql 工具

python 連線 mysql 工具

import pymysql
import re
import time
import math
from datetime import datetime
import traceback
import json
import threading
import logging
import queue
import sys
import configparser

import pandas as pd
from log import logger
from QQemail import QQemail
QQ=QQemail()

log_human='update_human_name.log
' class HandleMysql: def __init__(self, filepath=None, confname=None,log_file='HandleMysql.log'): configpath = 'config.ini' self.cfg = configparser.RawConfigParser() self.cfg.read(configpath) self.confname = confname self.log = log_file host = self.cfg.get(self.confname, "
host") user = self.cfg.get(self.confname, "user") password = self.cfg.get(self.confname, "passwd") db = self.cfg.get(self.confname, "db") port = self.cfg.get(self.confname, "port") charset = self.cfg.get(self.confname, "charset") try: self.conn
= pymysql.Connect(host=host, user=user, password=password, db=db, port=int(port), charset=charset) self.cur = self.conn.cursor(pymysql.cursors.DictCursor) except Exception as e: logger("initialate db conn error: {}".format(str(e)),'error',self.log) """執行操作資料的相關sql""" def execute(self, sql, data): try: self.cur.execute(sql, data) self.conn.commit() except: self.conn.ping() self.cur.execute(sql, data) self.conn.commit() """執行更新sql""" def update(self, sql): try: self.cur.execute(sql) self.conn.commit() except: self.conn.ping() self.cur.execute(sql) self.conn.commit() """批量執行更新sql""" def execute_many(self, sql, datas): try: self.cur.executemany(sql, datas) self.conn.commit() except: self.conn.ping() self.cur.executemany(sql, datas) self.conn.commit() """執行查詢sql""" def search(self, sql): try: self.cur.execute(sql) except: self.conn.ping() self.cur.execute(sql) return self.cur.fetchall() """執行查詢sql""" def search(self, sql, data=None): try: self.cur.execute(sql, data) except: self.conn.ping() self.cur.execute(sql, data) return self.cur.fetchall() """批量插入資料庫""" def insert_mysql(self,insert_sql,datas): try: start = datetime.now() self.execute_many(insert_sql, datas) delta = (datetime.now() - start).seconds rows = len(datas) print('update insert {} rows, cost:{} s \n'.format(rows,delta)) logger('update insert {} rows, cost:{} s \n'.format(rows,delta),'info',self.log) return rows,delta except Exception as e: print('insert mysql error {} '.format(e),'error') text = '{} update fail error: {} '.format(insert_sql,e) #QQ.send(text,'【GPU】【warning】 insert sql update fail】') logger('insert mysql error {} '.format(e),'error',self.log) """批量插入資料庫""" def into_mysql(self,table,res): df_list = res.values.tolist() cols = res.columns #print('--- 插入 MySql 資料庫 --- ') try: insert = """ INSERT into {}({}) VALUES({}) ON DUPLICATE KEY UPDATE {} """.format(table,','.join(cols),','.join(['%s']*len(cols)),','.join([i+'=values('+i+')' for i in cols])) #print(insert) self.insert_mysql(insert,df_list) except Exception as e: print('======> Exception {} <======='.format(e)) logger('insert mysql error {} '.format(e),'error',self.log) return """批量更新資料庫""" def update_mysql(self,table,set_col,case_col,data): # data dict #results = [{'id':210,'legal_entity_id':'zhangsan'},{'id':212,'legal_entity_id':'zhangsan11'},] if not data: return try: sql = self.update_sql(data,table,set_col,case_col) self.update(sql) except Exception as e: print('======> Exception {} <======='.format(e)) logger('update mysql error {} '.format(e),'error',self.log) return def update_sql(self,data,table,set_col,case_col): """set_col : set list columns case_col : string """ sql_set = "UPDATE {} SET \n".format(table) sql_in = str(data[0][case_col]) sql_case_list = [] for inx_col,col in enumerate(set_col): sql_col_1 = '''\n {} = CASE {} \n '''.format(col,case_col) sql_col_2 = '' for inx,res in enumerate(data): sql_col_2 += """ WHEN {} THEN '{}'""" .format(res[case_col],res[col]) if (inx_col == 0 and inx != 0): sql_in += ',' + str(res[case_col]) sql_col_2 +=' END' sql_col_1 += sql_col_2 sql_case_list.append(sql_col_1) sql_where= ','.join(sql_case_list) update_sql = sql_set + sql_where + "\nWHERE " + case_col + " IN (%s)" % (sql_in) print('update sql {} rows',len(sql_case_list)) return update_sql """關閉資料庫連線""" def close(self): self.cur.close() self.conn.close()