Python資料庫連線池DBUtils詳解
Python資料庫連線池DBUtils詳解
what's the DBUtils
DBUtils 是一套用於管理資料庫連線池的Python包,為高頻度高併發的資料庫訪問提供更好的效能,可以自動管理連線物件的建立和釋放。並允許對非執行緒安全的資料庫介面進行執行緒安全包裝。
DBUtils提供兩種外部介面:
- PersistentDB :提供執行緒專用的資料庫連線,並自動管理連線。
- PooledDB :提供執行緒間可共享的資料庫連線,並自動管理連線。
實測證明 PersistentDB 的速度是最高的,但是在某些特殊情況下,資料庫的連線過程可能異常緩慢,而此時的PooledDB則可以提供相對來說平均連線時間比較短的管理方式。
另外,實際使用的資料庫驅動也有所依賴,比如SQLite資料庫只能使用PersistentDB作連線池。 下載地址:http://www.webwareforpython.org/downloads/DBUtils/
DBUtils使用方法
連線池物件只初始化一次,一般可以作為模組級程式碼來確保。 PersistentDB的連線例子:
import DBUtils.PersistentDB
persist=DBUtils.PersistentDB.PersistentDB(dbpai=MySQLdb,maxusage=1000,**kwargs)
這裡的引數dbpai指使用的底層資料庫模組,相容DB-API的。maxusage則為一個連線最大使用次數,參考了官方例子。後面的**kwargs則為實際傳遞給MySQLdb的引數。
獲取連線: conn=persist.connection() 實際程式設計中用過的連線直接關閉 conn.close() 即可將連線交還給連線池。
PooledDB使用方法同PersistentDB,只是引數有所不同。
- dbapi :資料庫介面
- mincached :啟動時開啟的空連線數量
- maxcached :連線池最大可用連線數量
- maxshared :連線池最大可共享連線數量
- maxconnections :最大允許連線數量
- blocking :達到最大數量時是否阻塞
- maxusage :單個連線最大複用次數
- setsession :用於傳遞到資料庫的準備會話,如 [”set name UTF-8″] 。
db=pooled.connection()
cur=db.cursor()
cur.execute(sql)
res=cur.fetchone()
cur.close() # or del cur
db.close() # or del db
python不用連線池的MySQL連線方法
import MySQLdb
conn= MySQLdb.connect(host='localhost',user='root',passwd='pwd',db='myDB',port=3306)
#import pymysql
#conn = pymysql.connect(host='localhost', port='3306', db='game', user='root', password='123456', charset='utf8')
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()
用連線池後的連線方法
import MySQLdb
from DBUtils.PooledDB import PooledDB
pool = PooledDB(MySQLdb,5,host='localhost',user='root',passwd='pwd',db='myDB',port=3306) #5為連線池裡的最少連線數
conn = pool.connection() #以後每次需要資料庫連線就是用connection()函式獲取連線就好了
cur=conn.cursor()
SQL="select * from table1"
r=cur.execute(SQL)
r=cur.fetchall()
cur.close()
conn.close()
DBUtils下載地址:https://pypi.python.org/pypi/DBUtils/
import sys
import threading
import MySQLdb
import DBUtils.PooledDB
connargs = { "host":"localhost", "user":"user1", "passwd":"123456", "db":"test" }
def test(conn):
try:
cursor = conn.cursor()
count = cursor.execute("select * from users")
rows = cursor.fetchall()
for r in rows: pass
finally:
conn.close()
def testloop():
print ("testloop")
for i in range(1000):
conn = MySQLdb.connect(**connargs)
test(conn)
def testpool():
print ("testpool")
pooled = DBUtils.PooledDB.PooledDB(MySQLdb, **connargs)
for i in range(1000):
conn = pooled.connection()
test(conn)
def main():
t = testloop if len(sys.argv) == 1 else testpool
for i in range(10):
threading.Thread(target = t).start()
if __name__ == "__main__":
main()
看看 10 執行緒的測試結果。
$ time ./main.py
testloop
testloop
testloop
testloop
testloop
testloop
testloop
testloop
testloop
testloop
real 0m4.471s
user 0m0.570s
sys 0m4.670s
$ time ./main.py -l
testpool
testpool
testpool
testpool
testpool
testpool
testpool
testpool
testpool
testpool
real 0m2.637s
user 0m0.320s
sys 0m2.750s
雖然測試方式不是很嚴謹,但從測試結果還是能感受到 DBUtils 帶來的效能提升。當然,我們我們也可以在 testloop() 中一直重複使用一個不關閉的 Connection,但這卻不適合實際開發時的情形。
DBUtils 提供了幾個引數,便於我們更好地調整資源利用。
DBUtils.PooledDB.PooledDB(self, creator,
mincached=0, maxcached=0, maxshared=0, maxconnections=0, blocking=False, maxusage=None,
setsession=None, failures=None, *args, **kwargs)
Docstring:
Set up the DB-API 2 connection pool.
creator: either an arbitrary function returning new DB-API 2
connection objects or a DB-API 2 compliant database module
mincached: initial number of idle connections in the pool
(0 means no connections are made at startup)
maxcached: maximum number of idle connections in the pool
(0 or None means unlimited pool size)
maxshared: maximum number of shared connections
(0 or None means all connections are dedicated)
When this maximum number is reached, connections are
shared if they have been requested as shareable.
maxconnections: maximum number of connections generally allowed
(0 or None means an arbitrary number of connections)
blocking: determines behavior when exceeding the maximum
(if this is set to true, block and wait until the number of
connections decreases, otherwise an error will be reported)
maxusage: maximum number of reuses of a single connection
(0 or None means unlimited reuse)
When this maximum usage number of the connection is reached,
the connection is automatically reset (closed and reopened).
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
failures: an optional exception class or a tuple of exception classes
for which the connection failover mechanism shall be applied,
if the default (OperationalError, InternalError) is not adequate
args, kwargs: the parameters that shall be passed to the creator
function or the connection constructor of the DB-API 2 module
DBUtils 僅提供給了連線池管理,實際的資料庫操作依然是由符合 DB-API 2 標準的目標資料庫模組完成的。
一個面向物件使用DBUtils的栗子
# coding=utf-8
"""
使用DBUtils資料庫連線池中的連線,操作資料庫
OperationalError: (2006, ‘MySQL server has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql
class MysqlClient(object):
__pool = None;
def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, setsession=None, reset=True,
host='127.0.0.1', port=3306, db='test',
user='root', passwd='123456', charset='utf8mb4'):
"""
:param mincached:連線池中空閒連線的初始數量
:param maxcached:連線池中空閒連線的最大數量
:param maxshared:共享連線的最大數量
:param maxconnections:建立連線池的最大數量
:param blocking:超過最大連線數量時候的表現,為True等待連線數量下降,為false直接報錯處理
:param maxusage:單個連線的最大重複使用次數
:param setsession:optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
:param reset:how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param host:資料庫ip地址
:param port:資料庫埠
:param db:庫名
:param user:使用者名稱
:param passwd:密碼
:param charset:字元編碼
"""
if not self.__pool:
self.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, setsession, reset,
host=host, port=port, db=db,
user=user, passwd=passwd,
charset=charset,
cursorclass=pymysql.cursors.DictCursor
)
self._conn = None
self._cursor = None
self.__get_conn()
def __get_conn(self):
self._conn = self.__pool.connection();
self._cursor = self._conn.cursor();
def close(self):
try:
self._cursor.close()
self._conn.close()
except Exception as e:
print e
def __execute(self, sql, param=()):
count = self._cursor.execute(sql, param)
print count
return count
@staticmethod
def __dict_datetime_obj_to_str(result_dict):
"""把字典裡面的datatime物件轉成字串,使json轉換不出錯"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict
def select_one(self, sql, param=()):
"""查詢單個結果"""
count = self.__execute(sql, param)
result = self._cursor.fetchone()
""":type result:dict"""
result = self.__dict_datetime_obj_to_str(result)
return count, result
def select_many(self, sql, param=()):
"""
查詢多個結果
:param sql: qsl語句
:param param: sql引數
:return: 結果數量和查詢結果集
"""
count = self.__execute(sql, param)
result = self._cursor.fetchall()
""":type result:list"""
[self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(self, sql, param=()):
count = self.__execute(sql, param)
return count
def begin(self):
"""開啟事務"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""結束事務"""
if option == 'commit':
self._conn.autocommit()
else:
self._conn.rollback()
if __name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM shiji WHERE id = 1'
result1 = mc.select_one(sql1)
print json.dumps(result1[1], ensure_ascii=False)
sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
param = (2, 3, 4)
print json.dumps(mc.select_many(sql2, param)[1], ensure_ascii=False)