1. 程式人生 > 實用技巧 >python pymysql操作資料庫

python pymysql操作資料庫

import pymysql


def creatDB(dbName):
    """
    dbName:資料庫名稱
    建立資料庫
    """
    conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', charset='utf8')
    myCursor = conn.cursor()
    myCursor.execute("CREATE DATABASE {}".format(str(dbName)))
    myCursor.close()
    conn.close()


def creatTable(dbName, tableName): """ dbName:資料庫名稱 tableName:表名稱 建立表 """ conn = pymysql.connect( host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8' ) myCursor = conn.cursor() sql
= "CREATE TABLE {} (" \ "id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, " \ "name VARCHAR(255) UNIQUE, " \ "age TINYINT DEFAULT NULL, " \ "gender boolean DEFAULT FALSE, " \ "testText TEXT, " \ "img mediumblob DEFAULT NULL)".format(str(tableName)) myCursor.execute(sql) myCursor.close() conn.close()
def getAllTables(dbName): """ dbName:資料庫名稱 tableName:表名稱 dataDic:一個字典,鍵是表的欄位,值是要插入的值 獲取所有的表名稱,返回值是一個包含所有表名稱的列表 """ tableList = [] conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() myCursor.execute("show tables") tableNames = myCursor.fetchall() for tableName in tableNames: tableList.append(tableName[0]) myCursor.close() conn.close() return tableList def insertIntoTable(dbName, tableName, dataDic): """ dbName:資料庫名稱 tableName:表名稱 dataDic:一個字典,鍵是表的欄位,值是要插入的值 往資料庫裡插入一條資料 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() keys = ', '.join(dataDic.keys()) values = ', '.join(['%s'] * len(dataDic)) sql = 'INSERT INTO {table} ({keys}) VALUES ({values})'.format(table=tableName, keys=keys, values=values) try: if myCursor.execute(sql, tuple(dataDic.values())): print('插入資料成功') conn.commit() lastID = myCursor.lastrowid return lastID except Exception as e: print(e) print('插入資料失敗') conn.rollback() finally: myCursor.close() conn.close() def updateIntoTable(dbName, tableName, dataDic): """ dbName:資料庫名稱 tableName:表名稱 dataDic:一個字典,鍵是表的欄位,值是要插入的值 如果資料庫裡面有這條資料,就對這條資料進行修改操作,如果沒有,就增加一條資料,進行修改操作時,ID不變。 返回值是最新插入資料的ID或被修改資料的原ID。 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() keys = ', '.join(dataDic.keys()) values = ', '.join(['%s'] * len(dataDic)) sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=tableName, keys=keys, values=values) update = ','.join([" {key} = %s".format(key=key) for key in dataDic]) sql += update try: if myCursor.execute(sql, tuple(dataDic.values()) * 2): print('插入資料成功') conn.commit() lastID = myCursor.lastrowid return lastID except Exception as e: print(e) print('插入資料失敗') conn.rollback() finally: myCursor.close() conn.close() def replaceIntoTable(dbName, tableName, dataDic): """ dbName:資料庫名稱 tableName:表名稱 dataDic:一個字典,鍵是表的欄位,值是要插入的值 如果資料庫裡面有這條資料,先刪除原來的資料在進行增加,如果沒有,就增加一條資料,ID改變。 返回值是最新插入資料的ID。 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() keys = ', '.join(dataDic.keys()) values = ', '.join(['%s'] * len(dataDic)) sql = 'REPLACE INTO {table} ({keys}) VALUES ({values})'.format(table=tableName, keys=keys, values=values) try: if myCursor.execute(sql, tuple(dataDic.values())): print('插入資料成功') conn.commit() lastID = myCursor.lastrowid return lastID except Exception as e: print(e) print('插入資料失敗') conn.rollback() finally: myCursor.close() conn.close() def delData(dbName, tableName, condition): """ dbName:資料庫名稱 tableName:表名稱 condition:刪除的條件 刪除資料 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() sql = 'DELETE FROM {table} WHERE {condition}'.format(table=tableName, condition=condition) try: if myCursor.execute(sql): print('刪除資料成功') conn.commit() except Exception as e: print(e) print('刪除資料失敗') conn.rollback() finally: myCursor.close() conn.close() def selectDataAll(dbName, tableName): """ dbName:資料庫名稱 tableName:表名稱 查詢所有的資料 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() sql = 'SELECT * FROM {table}'.format(table=tableName) if myCursor.execute(sql): allData = myCursor.fetchall() return allData else: return None def selectDataOne(dbName, tableName): """ dbName:資料庫名稱 tableName:表名稱 查詢第一條資料 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() sql = 'SELECT * FROM {table}'.format(table=tableName) if myCursor.execute(sql): oneData = myCursor.fetchone() return oneData else: return None def selectDataMany(dbName, tableName, size): """ dbName:資料庫名稱 tableName:表名稱 size:想要查詢的資料 int型別 查詢多條資料 """ conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', database=str(dbName), charset='utf8') myCursor = conn.cursor() sql = 'SELECT * FROM {table}'.format(table=tableName) if myCursor.execute(sql): manyData = myCursor.fetchmany(size) return manyData else: return None