python利用pymysql操作mysql資料庫
阿新 • • 發佈:2018-12-19
python: ver 3.6
1, 安裝pymysql
聯網狀態下,直接用easy_install-3.6.exe PyMySQL
2, 需求:
a, 利用pymysql連線到mysql資料庫
b, 根據本地的sql檔案建立表
注: * sql檔案格式參考MySQL-Front匯出的sql檔案
* sql檔案的名字就是資料庫的名字,eg:Test.sql就是資料庫Test匯出的sql檔案
c, 往資料庫中插入資料
d, 查詢資料
#/usr/bin/python # -*- coding: utf-8 -*- import os import re import sys import pymysql class MyDB: def __init__(self, host, port, user, passwd): self._host = host self._port = port self._user = user self._passwd = passwd self._conn = None self._cursor = None self._connect() def __del__(self): print('----del---------') self._cursor.close() self._conn.close() def _connect(self): try: self._conn = pymysql.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd, charset='utf8', ) self._cursor = self._conn.cursor() except Exception: raise RuntimeError('DB connect failed, ip[%s]port[%d]usr[%s]passwd[%s]' % (self._host, self._port, self._user, self._passwd)) def _execute_sql(self, sql): try: self._cursor.execute(sql) except Exception: self._conn.rollback() raise RuntimeError('Execute sql [%s] failed' % sql) def _select_db(self, db_name): try: if not self._is_database_exist(db_name): self._execute_sql("CREATE DATABASE IF NOT EXISTS %s" % db_name) self._conn.select_db(db_name) except Exception: raise RuntimeError("Select db [%s] failed!"%db_name) def _get_tables(self): self._execute_sql("SHOW TABLES") result = self._cursor.fetchall() tables = [] for n in result: tables.append(n[0]) return tables def update_mysqldb(self, file): print('file name:%s' % file) sql_cmds = MyDB.get_mysql_cmd(file) # 根據sql檔案獲取資料庫的名字eg:Test.sql所對應的資料庫為Test db_name = os.path.split(file)[1].split('.')[0] self._select_db(db_name) tables = self._get_tables() # 匹配以CREATE TABLE開頭的字串 p_create = re.compile(r'^create table', re.I) # 匹配字串中的``欄位(match是從頭開始匹配,所以這裡不能用match) p_table = re.compile(r'`(.*?)`') for sql_cmd in sql_cmds: print(sql_cmd) p1 = p_create.findall(sql_cmd) p2 = p_table.findall(sql_cmd) if p1 and p2: table_name = p2[0] if table_name not in tables: self._execute_sql(sql_cmd) print('Create new table name:%s' % table_name) else: print('table name:%s exist' % table_name) else: print('no match') self._conn.commit() def insert_data_to_user_db(self, user_id, name, age): self._select_db('Test') # name是一個字串,所以sql語句要用''包裹 sql_insert = """INSERT INTO User (id, name, age) VALUES(%d,'%s',%d)""" self._execute_sql(sql_insert % (user_id, name, age)) self._conn.commit() def select_data_from_db(self, db_name, table_name): self._select_db(db_name) sql_select = """SELECT * FROM (%s) """ self._execute_sql(sql_select % table_name) result = self._cursor.fetchall() for i in result: print(i) @staticmethod def get_mysql_cmd(file): try: if not file.endswith('.sql'): raise RuntimeError('not sql file') with open(file, mode='r', encoding='UTF-8') as fObj: lines = fObj.readlines() sql_cmd = '' p = re.compile(r'^[/#]') p2 = re.compile(r'.*(;)$') for line in lines: if line and line.strip() and not p.match(line): sql_cmd += line if p2.match(line): tmp = sql_cmd sql_cmd = '' yield tmp except Exception: raise RuntimeError('open file [%s] failed' % file) def test(self): print('--------test start-----') self._cursor.execute("SHOW DATABASES") rows = self._cursor.fetchall() for i in rows: print('database_name:%s' % i[0]) print('--------test end-----') def _is_database_exist(self, database_name): self._cursor.execute("SHOW DATABASES") rows = self._cursor.fetchall() b_exist = False for i in rows: if i[0] == database_name: b_exist = True return b_exist if __name__ == '__main__': try: args = len(sys.argv) ip = '' sql_file = '' if args != 3: raise RuntimeError('Usage: %s ip sql_file!' % sys.argv[0]) else: ip = sys.argv[1] sql_file = sys.argv[2] conn = MyDB(host=ip, port=3306, user='root', passwd='123') conn.update_mysqldb(sql_file) conn.insert_data_to_user_db(103, 'Uzi', 9999) conn.select_data_from_db('Test', 'User') except Exception as e: raise RuntimeError('Error: %s' % str(e))
out: