1. 程式人生 > >python利用pymysql操作mysql資料庫

python利用pymysql操作mysql資料庫

python: ver 3.6

1, 安裝pymysql

聯網狀態下,直接用easy_install-3.6.exe PyMySQL

2, 需求:

    a, 利用pymysql連線到mysql資料庫

    b, 根據本地的sql檔案建立表

    注: * sql檔案格式參考MySQL-Front匯出的sql檔案

            * sql檔案的名字就是資料庫的名字,eg:Test.sql就是資料庫Test匯出的sql檔案

    c, 往資料庫中插入資料

    d, 查詢資料

#/usr/bin/python
# -*- coding: utf-8 -*-

import os
import re
import sys
import pymysql


class MyDB:
    def __init__(self, host, port, user, passwd):
        self._host = host
        self._port = port
        self._user = user
        self._passwd = passwd
        self._conn = None
        self._cursor = None
        self._connect()

    def __del__(self):
        print('----del---------')
        self._cursor.close()
        self._conn.close()

    def _connect(self):
        try:
            self._conn = pymysql.connect(host=self._host,
                                         port=self._port,
                                         user=self._user,
                                         passwd=self._passwd,
                                         charset='utf8',
                                         )
            self._cursor = self._conn.cursor()
        except Exception:
            raise RuntimeError('DB connect failed, ip[%s]port[%d]usr[%s]passwd[%s]' %
                               (self._host, self._port, self._user, self._passwd))

    def _execute_sql(self, sql):
        try:
            self._cursor.execute(sql)
        except Exception:
            self._conn.rollback()
            raise RuntimeError('Execute sql [%s] failed' % sql)

    def _select_db(self, db_name):
        try:
            if not self._is_database_exist(db_name):
                self._execute_sql("CREATE DATABASE IF NOT EXISTS %s" % db_name)
            self._conn.select_db(db_name)
        except Exception:
            raise RuntimeError("Select db [%s] failed!"%db_name)

    def _get_tables(self):
        self._execute_sql("SHOW TABLES")
        result = self._cursor.fetchall()
        tables = []
        for n in result:
            tables.append(n[0])
        return tables

    def update_mysqldb(self, file):
        print('file name:%s' % file)
        sql_cmds = MyDB.get_mysql_cmd(file)
        # 根據sql檔案獲取資料庫的名字eg:Test.sql所對應的資料庫為Test
        db_name = os.path.split(file)[1].split('.')[0]
        self._select_db(db_name)
        tables = self._get_tables()

        # 匹配以CREATE TABLE開頭的字串
        p_create = re.compile(r'^create table', re.I)
        # 匹配字串中的``欄位(match是從頭開始匹配,所以這裡不能用match)
        p_table = re.compile(r'`(.*?)`')
        for sql_cmd in sql_cmds:
            print(sql_cmd)
            p1 = p_create.findall(sql_cmd)
            p2 = p_table.findall(sql_cmd)
            if p1 and p2:
                table_name = p2[0]
                if table_name not in tables:
                    self._execute_sql(sql_cmd)
                    print('Create new table name:%s' % table_name)
                else:
                    print('table name:%s exist' % table_name)
            else:
                print('no match')
        self._conn.commit()

    def insert_data_to_user_db(self, user_id, name, age):
        self._select_db('Test')
        # name是一個字串,所以sql語句要用''包裹
        sql_insert = """INSERT INTO User (id, name, age) VALUES(%d,'%s',%d)"""
        self._execute_sql(sql_insert % (user_id, name, age))
        self._conn.commit()

    def select_data_from_db(self, db_name, table_name):
        self._select_db(db_name)
        sql_select = """SELECT * FROM (%s) """
        self._execute_sql(sql_select % table_name)
        result = self._cursor.fetchall()
        for i in result:
            print(i)

    @staticmethod
    def get_mysql_cmd(file):
        try:
            if not file.endswith('.sql'):
                raise RuntimeError('not sql file')
            with open(file, mode='r', encoding='UTF-8') as fObj:
                lines = fObj.readlines()
                sql_cmd = ''
                p = re.compile(r'^[/#]')
                p2 = re.compile(r'.*(;)$')
                for line in lines:
                    if line and line.strip() and not p.match(line):
                        sql_cmd += line
                        if p2.match(line):
                            tmp = sql_cmd
                            sql_cmd = ''
                            yield tmp
        except Exception:
            raise RuntimeError('open file [%s] failed' % file)

    def test(self):
        print('--------test start-----')
        self._cursor.execute("SHOW DATABASES")
        rows = self._cursor.fetchall()
        for i in rows:
            print('database_name:%s' % i[0])
        print('--------test end-----')

    def _is_database_exist(self, database_name):
        self._cursor.execute("SHOW DATABASES")
        rows = self._cursor.fetchall()
        b_exist = False
        for i in rows:
            if i[0] == database_name:
                b_exist = True
        return b_exist


if __name__ == '__main__':
    try:
        args = len(sys.argv)
        ip = ''
        sql_file = ''
        if args != 3:
            raise RuntimeError('Usage: %s ip sql_file!' % sys.argv[0])
        else:
            ip = sys.argv[1]
            sql_file = sys.argv[2]

        conn = MyDB(host=ip, port=3306, user='root', passwd='123')
        conn.update_mysqldb(sql_file)
        conn.insert_data_to_user_db(103, 'Uzi', 9999)
        conn.select_data_from_db('Test', 'User')
    except Exception as e:
        raise RuntimeError('Error: %s' % str(e))

out: