python操作mysql及mysql連線池
阿新 • • 發佈:2021-01-21
技術標籤:python進階mysqlmysqlpythonsql連線池
作為一個合格的crud工程師,我真的就只記得crud,最近由於忘記了mysql賬號密碼(包括root賬號),在找回賬號的過程中順便就梳理了一些常用的mysql用法。
忘記root密碼
第一步: 關閉mysql
$ service mysql stop
第二步: 修改my.cnf檔案
$ vim /etc/mysql/my.cnf
第三步: 重啟mysql服務,並進入mysql修改密碼
$ service mysql restart
$ mysql
mysql> use mysql;
mysql> update user set authentication_string=PASSWORD("new_pass") where user='root';
第四步: 撤回對my.cnf檔案的修改
Python操作MySQL
python操作MySQL的常用方法是使用pymysql模組。在單次使用時,只需要直接呼叫connect()方法即可。
import pymysql
#連結資料庫
db = pymysql.connect(host='yourhost',user='yourname',passwd='yourpasswd',db='yourdb')
cursor = db.cursor()
cursor. execute("select * from yourdb")
result = cursor.fetchone()
但是為了達到複用的效果,可以選擇對一些常用的操作進行封裝。例如,新建一個操作mysql的工具類,命名檔案為lib_mysql.py,類中封裝exec_sql()方法用於查詢。
import pymysql
from urllib.parse import urlparse
class Client:
def __init__(self, url, **kwargs):
self.url = url
if not url:
return
url = urlparse(url)
settings = {
'host': url.hostname,
'port': url.port or 3306,
'charset': 'utf8',
'db': url.path.strip().strip('/'),
'user': url.username,
'passwd': url.password,
'cursorclass': pymysql.cursors.DictCursor
}
self.settings = settings
if kwargs:
self.settings.update(kwargs)
self.conn = pymysql.connect(**self.settings)
self.cursor = self.conn.cursor()
def exec_sql(self, sql, fetch_rows=False, commit=True, raise_exc=False):
try:
self.cursor.execute(sql)
if commit:
self.conn.commit()
except Exception as exc:
logger.exception('exe_sql got exception: {}'.format(str(exc)))
if raise_exc:
raise
if commit:
self.conn.rollback()
if fetch_rows:
rows = self.cursor.fetchall()
return rows
這樣之後想要查詢mysql時,只需要import lib_mysql即可。例如:
>>> import lib_mysql
>>> client = lib_mysql.Client(url='mysql://test:[email protected]@localhost:3306/test?charset=utf8')
>>> sql = '''select * from conn_pool_test'''
>>> record = client.exec_sql(sql,fetch_rows=True)
>>> type(record)
<class 'list'>
>>> record
[{'id': 1, 'app_id': '001', 'app_name': 'android_test'},
{'id': 2, 'app_id': '002', 'app_name': 'ios_test'},
{'id': 3, 'app_id': '003', 'app_name': 'web_test'},
{'id': 4, 'app_id': '004', 'app_name': 'mac_test'},
{'id': 5, 'app_i
同樣地,可以在工具類中封裝插入方法,例如:
def upsert_to_mysql(self, items, table, fields):
try:
fields_str = ','.join(fields)
replace_str = ','.join(['%s' for _ in range(len(fields))])
fields_update_str = ','.join(['{}=values({})'.format(field, field) for field in fields])
sql = '''
insert ignore into {table} ( {fields} )
values ({replace_str})
on duplicate key update {fields_update}
'''.format(table=table, replace_str=replace_str, fields=fields_str, fields_update=fields_update_str)
records = []
for item in items:
row = tuple([item[field] for field in fields])
records.append(row)
self.cursor.executemany(sql, records)
self.conn.commit()
except Exception as e:
print(e)
traceback.print_exc()
self.conn.rollback()
呼叫插入方法:
>>> fields = ['id', 'app_id' ,'app_name']
>>> data = [{'id': 6, 'app_id': '006', 'app_name': 'Solor'}, {'id': 7, 'app_id': '007', 'app_name': 'Symbian'}]
>>> client.upsert_to_mysql(table='conn_pool_test', fields=fields, records=data)
total 2 records
processed 2 records
MySQL連線池
如果每一次連線資料庫只是做了簡單操作,然後反覆連線斷連。如果短時間內連線次數過多,會給資料庫帶來壓力。因此,可以採用連線池的方式,讓連線重複使用,降低資料庫資源消耗。
這裡介紹採用pymysql和DBUtils實現mysql連線池的方式。
import pymysql
from dbutils.pooled_db import PooledDB, SharedDBConnection
from urllib.parse import urlparse
class MysqlPool(object):
def __init__(self, url):
self.url = url
if not url:
return
url = urlparse(url)
self.POOL = PooledDB(
creator=pymysql,
maxconnections=10, # 連線池的最大連線數
maxcached=10,
maxshared=10,
blocking=True,
setsession=[],
host=url.hostname,
port=url.port or 3306,
user=url.username,
password=url.password,
database=url.path.strip().strip('/'),
charset='utf8',
)
def __new__(cls, *args, **kw):
if not hasattr(cls, '_instance'):
cls._instance = object.__new__(cls)
return cls._instance
def connect(self):
conn = self.POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
def connect_close(self,conn, cursor):
cursor.close()
conn.close()
def fetch_all(self,sql, args):
conn, cursor = self.connect()
if args is None:
cursor.execute(sql)
else:
cursor.execute(sql, args)
record_list = cursor.fetchall()
return record_list
def fetch_one(self,sql, args):
conn, cursor = self.connect()
cursor.execute(sql, args)
result = cursor.fetchone()
return result
def insert(self,sql, args):
conn, cursor = self.connect()
row = cursor.execute(sql, args)
conn.commit()
self.connect_close(conn, cursor)
return row
更多內容歡迎個人公眾號:PythonAndDataTech