1. 程式人生 > 其它 >python操作mysql及mysql連線池

python操作mysql及mysql連線池

技術標籤:python進階mysqlmysqlpythonsql連線池

作為一個合格的crud工程師,我真的就只記得crud,最近由於忘記了mysql賬號密碼(包括root賬號),在找回賬號的過程中順便就梳理了一些常用的mysql用法。

忘記root密碼

第一步: 關閉mysql

$ service mysql stop

第二步: 修改my.cnf檔案

$ vim /etc/mysql/my.cnf

第三步: 重啟mysql服務,並進入mysql修改密碼

$ service mysql restart
$ mysql
mysql> use mysql;
mysql> update user set
authentication_string=PASSWORD("new_pass") where user='root';

第四步: 撤回對my.cnf檔案的修改

Python操作MySQL

python操作MySQL的常用方法是使用pymysql模組。在單次使用時,只需要直接呼叫connect()方法即可。

import pymysql
#連結資料庫
db = pymysql.connect(host='yourhost',user='yourname',passwd='yourpasswd',db='yourdb')
cursor = db.cursor()  
cursor.
execute("select * from yourdb") result = cursor.fetchone()

但是為了達到複用的效果,可以選擇對一些常用的操作進行封裝。例如,新建一個操作mysql的工具類,命名檔案為lib_mysql.py,類中封裝exec_sql()方法用於查詢。

import pymysql
from urllib.parse import urlparse

class Client:
      def __init__(self, url, **kwargs):
          self.url = url
          if
not url: return url = urlparse(url) settings = { 'host': url.hostname, 'port': url.port or 3306, 'charset': 'utf8', 'db': url.path.strip().strip('/'), 'user': url.username, 'passwd': url.password, 'cursorclass': pymysql.cursors.DictCursor } self.settings = settings if kwargs: self.settings.update(kwargs) self.conn = pymysql.connect(**self.settings) self.cursor = self.conn.cursor() def exec_sql(self, sql, fetch_rows=False, commit=True, raise_exc=False): try: self.cursor.execute(sql) if commit: self.conn.commit() except Exception as exc: logger.exception('exe_sql got exception: {}'.format(str(exc))) if raise_exc: raise if commit: self.conn.rollback() if fetch_rows: rows = self.cursor.fetchall() return rows

這樣之後想要查詢mysql時,只需要import lib_mysql即可。例如:

>>> import lib_mysql
>>> client = lib_mysql.Client(url='mysql://test:[email protected]@localhost:3306/test?charset=utf8')
>>> sql = '''select * from conn_pool_test'''
>>> record = client.exec_sql(sql,fetch_rows=True)
>>> type(record)
<class 'list'>
>>> record
[{'id': 1, 'app_id': '001', 'app_name': 'android_test'}, 
 {'id': 2, 'app_id': '002', 'app_name': 'ios_test'}, 
 {'id': 3, 'app_id': '003', 'app_name': 'web_test'}, 
 {'id': 4, 'app_id': '004', 'app_name': 'mac_test'}, 
 {'id': 5, 'app_i

同樣地,可以在工具類中封裝插入方法,例如:

def upsert_to_mysql(self, items, table, fields):
        try:
            fields_str = ','.join(fields)
            replace_str = ','.join(['%s' for _ in range(len(fields))])
            fields_update_str = ','.join(['{}=values({})'.format(field, field) for field in fields])
            sql = '''
            insert ignore into {table} ( {fields} )
            values ({replace_str})
            on duplicate key update {fields_update}
            '''.format(table=table, replace_str=replace_str, fields=fields_str, fields_update=fields_update_str)
            records = []
            for item in items:
                row = tuple([item[field] for field in fields])
                records.append(row)
            self.cursor.executemany(sql, records)
            self.conn.commit()
        except Exception as e:
            print(e)
            traceback.print_exc()
            self.conn.rollback()

呼叫插入方法:

>>> fields = ['id', 'app_id' ,'app_name']
>>> data = [{'id': 6, 'app_id': '006', 'app_name': 'Solor'}, {'id': 7, 'app_id': '007', 'app_name': 'Symbian'}]
>>> client.upsert_to_mysql(table='conn_pool_test', fields=fields, records=data)
total 2 records
processed 2 records

MySQL連線池

如果每一次連線資料庫只是做了簡單操作,然後反覆連線斷連。如果短時間內連線次數過多,會給資料庫帶來壓力。因此,可以採用連線池的方式,讓連線重複使用,降低資料庫資源消耗。
這裡介紹採用pymysql和DBUtils實現mysql連線池的方式。

import pymysql
from dbutils.pooled_db import PooledDB, SharedDBConnection
from urllib.parse import urlparse
class MysqlPool(object):

    def __init__(self, url):
        self.url = url
        if not url:
            return
        url = urlparse(url)
        self.POOL = PooledDB(
            creator=pymysql, 
            maxconnections=10, # 連線池的最大連線數 
            maxcached=10, 
            maxshared=10,
            blocking=True,  
            setsession=[],  
            host=url.hostname,
            port=url.port or 3306,
            user=url.username,
            password=url.password,
            database=url.path.strip().strip('/'),
            charset='utf8',
        )
    def __new__(cls, *args, **kw):
        if not hasattr(cls, '_instance'):
            cls._instance = object.__new__(cls)
        return cls._instance

    def connect(self):
        conn = self.POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    def connect_close(self,conn, cursor):
        cursor.close()
        conn.close()

    def fetch_all(self,sql, args):
        conn, cursor = self.connect()
        if args is None:
            cursor.execute(sql)
        else:
            cursor.execute(sql, args)
        record_list = cursor.fetchall()
        return record_list

    def fetch_one(self,sql, args):
        conn, cursor = self.connect()
        cursor.execute(sql, args)
        result = cursor.fetchone()
        return result

    def insert(self,sql, args):
        conn, cursor = self.connect()
        row = cursor.execute(sql, args)
        conn.commit()
        self.connect_close(conn, cursor)
        return row

更多內容歡迎個人公眾號:PythonAndDataTech
在這裡插入圖片描述