1. 程式人生 > 資料庫 >python MySQL庫的操作_DB庫

python MySQL庫的操作_DB庫

import pymysql
from DBUtils.PooledDB import PooledDB
import time
from loguru import logger

# 資料庫連線
class MysqlCline(object):
    def __init__(self, ip='localhost', port=3306, user='root', passwd='root'):
        mysql_params = {
            "host": ip,
            "port": port,
            "user": user,
            "passwd": passwd,
            # "charset": charset,
        }
        while True:
            try:
                self.db_pool = PooledDB(pymysql, 2, **mysql_params)
                break
            except Exception as e:
                logger.warning(f"資料庫 連線 其他錯誤--{str(e)}")
                time.sleep(5)
                continue


# 資料庫增刪改查
class MysqlBasicOperation(MysqlCline):

    def select_db(self, sql):
        """查詢"""

        conn = self.db_pool.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        try:
            cursor.execute(sql)
            data = cursor.fetchall()
        except Exception as e:
            data = "mysql 查詢操作 出現錯誤!!!!!!!!!!" + str(e)
        return data

    def execute_db(self, sql, cao_zuo):
        """更新/新增/刪除"""
        conn = self.db_pool.connection()
        cursor = conn.cursor()
        try:
            data_status = cursor.execute(sql)
            conn.commit()  # 提交
        except Exception as e:
            data_status = f"mysql {cao_zuo}操作 出現錯誤!!!!!!!!!!" + str(e) + str(sql)
            # 回滾所有更改
            conn.rollback()
        return data_status

    def mysql_insert_dan(self, table_name, item_json):
        """插入mysql資料
       @param table_name: 資料庫.資料表 的形式 查詢表
       @param item_json: json 資料
       @return: 返回插入狀態   0-資料已經存在  1-插入成功
       """
        conn = self.db_pool.connection()  # 使用cursor()方法獲取操作遊標
        cursor = conn.cursor()
        keys = ','.join(item_json.keys())
        values = ','.join(['%s'] * len(item_json))
        sql = f'INSERT IGNORE INTO {table_name} ({keys}) values({values})'
        # print(sql)
        # print(tuple(item_json.values()))
        try:
            data_status = cursor.execute(sql, tuple(item_json.values()))
            conn.commit()  # 提交
        except Exception as e:
            e = "mysql 增加處 出現錯誤!!!!!!!!!!" + str(e)
            conn.rollback()
            data_status = e
        # cursor.close()
        # conn.close()
        # print(data_status)
        return data_status

    # 增
    def mysql_insert(self, table_name, item_json):
        keys = ','.join(item_json.keys())
        sql = f'INSERT IGNORE INTO {table_name} ({keys}) values {tuple(item_json.values())}'
        if "None" in sql:
            sql = sql.replace("None", "Null")
        data_status = self.execute_db(sql, cao_zuo='增加')
        return data_status

    # 刪
    def mysql_delete(self, table_name, where_str=''):
        if not where_str:
            logger.warning("必須傳值 要不然刪除的是整個 資料庫")
            return
        sql = f"DELETE FROM {table_name} {where_str}"
        data_status = self.execute_db(sql, cao_zuo='刪除')
        return data_status

    # 改
    def mysql_updat(self, table_name, updata_str, where_str):
        sql = f"update {table_name} set {updata_str} WHERE {where_str}"
        data_status = self.execute_db(sql, cao_zuo='修改')
        return data_status

    # 查
    def mysql_select(self, table_name, *args, where_str="", sql_str=""):
        """
        :param table_name: 資料庫.資料表 的形式 查詢表
        :param args:  查詢的欄位 可以是任何東西  count(*) * "id, name"
        :param where_str: 查詢條件:預設是""
        :return: 返回查詢到的資料
        """
        if sql_str:
            sql = sql_str
        else:
            if where_str:
                sql = f'select {",".join(args)} from {table_name} {where_str}'
            else:
                sql = f'select {",".join(args)} from {table_name}'
        datas = self.select_db(sql)
        if args == ('count(*)',):
            datas = datas[0].get("count(*)")
        else:
            datas = datas
        return datas

    # 相當於更新插入    每個欄位必須傳,要不然 覆蓋為 kong
    def mysql_fugai(self, table_name, item_json):
        keys = ','.join(item_json.keys())
        sql = f'REPLACE INTO {table_name} ({keys}) values {tuple(item_json.values())}'
        data_status = self.execute_db(sql, cao_zuo='覆蓋')
        # print(data_status)
        # data_status 2 更新成功 1 插入資料
        return data_status