flask - 資料庫連線池
阿新 • • 發佈:2020-08-06
flask
詳細
-
程式語言及他們之間的區別?
C/C++ ,很多語言的底層實現都是與C,程式碼執行效率高,自己做記憶體管理,對程式碼要求比較高,很多功能需要手動試下。 Java,比較好的程式語言,很多企業級應用都會選擇java。 C#,是微軟開始的程式語言,部署時需要放在windown server 上,最大弊端是window系統花錢。 PHP,一般用於快速搭建網站。 Python,簡潔且入門簡單,很少的程式碼就可以做很多事情。 Golang,語法和C比較接近,處理併發時比較有優勢 + docker。
1.資料庫連線池
1.1 初級階段
import pymysql from flask import Flask app = Flask(__name__) def fetchall(sql): conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.execute(sql) result = cursor.fetchall() cursor.close() conn.close() return result @app.route('/login') def login(): result = fetchall('select * from user') return 'login' @app.route('/index') def index(): result = fetchall('select * from user') return 'xxx' @app.route('/order') def order(): result = fetchall('select * from user') return 'xxx' if __name__ == '__main__': app.run()
1.2 老闆給修改
import pymysql from flask import Flask app = Flask(__name__) CONN = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') def fetchall(sql): cursor = CONN.cursor() cursor.execute(sql) result = cursor.fetchall() cursor.close() return result @app.route('/login') def login(): result = fetchall('select * from user') return 'login' @app.route('/index') def index(): result = fetchall('select * from user') return 'xxx' @app.route('/order') def order(): result = fetchall('select * from user') return 'xxx' if __name__ == '__main__': app.run()
1.3 資料庫連線池
安裝
pip3 install dbutils
pip3 install pymysql
使用
import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用連結資料庫的模組 maxconnections=6, # 連線池允許的最大連線數,0和None表示不限制連線數 mincached=2, # 初始化時,連結池中至少建立的連結,0表示不建立 blocking=True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯 ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='222', database='cmdb', charset='utf8' ) # 去連線池中獲取一個連線 conn = POOL.connection() cursor = conn.cursor() cursor.execute('select * from web_models_disk') result = cursor.fetchall() cursor.close() # 將連線放會到連線池 conn.close() print(result)
多執行緒測試
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用連結資料庫的模組
maxconnections=6, # 連線池允許的最大連線數,0和None表示不限制連線數
mincached=2, # 初始化時,連結池中至少建立的連結,0表示不建立
blocking=True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
def task(num):
# 去連線池中獲取一個連線
conn = POOL.connection()
cursor = conn.cursor()
# cursor.execute('select * from web_models_disk')
cursor.execute('select sleep(3)')
result = cursor.fetchall()
cursor.close()
# 將連線放會到連線池
conn.close()
print(num,'------------>',result)
from threading import Thread
for i in range(57):
t = Thread(target=task,args=(i,))
t.start()
基於函式實現sqlhelper
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用連結資料庫的模組
maxconnections=6, # 連線池允許的最大連線數,0和None表示不限制連線數
mincached=2, # 初始化時,連結池中至少建立的連結,0表示不建立
blocking=True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
def fetchall(sql,*args):
""" 獲取所有資料 """
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql,args)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
def fetchone(sql, *args):
""" 獲取單挑資料 """
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
cursor.close()
conn.close()
return result
from flask import Flask
import sqlhelper
app = Flask(__name__)
@app.route('/login')
def login():
result = sqlhelper.fetchone('select * from web_models_admininfo where username=%s ','wupeiqi')
print(result)
return 'login'
@app.route('/index')
def index():
result = sqlhelper.fetchall('select * from web_models_disk')
print(result)
return 'xxx'
@app.route('/order')
def order():
# result = fetchall('select * from user')
return 'xxx'
if __name__ == '__main__':
app.run()
基於類實現sqlhelper
import pymysql
from DBUtils.PooledDB import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用連結資料庫的模組
maxconnections=6, # 連線池允許的最大連線數,0和None表示不限制連線數
mincached=2, # 初始化時,連結池中至少建立的連結,0表示不建立
blocking=True, # 連線池中如果沒有可用連線後,是否阻塞等待。True,等待;False,不等待然後報錯
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn,cursor
def close(self,cursor,conn):
cursor.close()
conn.close()
def fetchall(self,sql, *args):
""" 獲取所有資料 """
conn,cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn,cursor)
return result
def fetchone(self,sql, *args):
""" 獲取所有資料 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
db = SqlHelper()
from flask import Flask
from sqlhelper2 import db
app = Flask(__name__)
@app.route('/login')
def login():
# db.fetchone()
return 'login'
@app.route('/index')
def index():
# db.fetchall()
return 'xxx'
@app.route('/order')
def order():
# db.fetchall()
conn,cursor = db.open()
# 自己做操作
db.close(conn,cursor)
return 'xxx'
if __name__ == '__main__':
app.run()
2.補充知識點:上下文管理
class Foo(object):
def __enter__(self):
return 123
def __exit__(self, exc_type, exc_val, exc_tb):
pass
obj = Foo()
with obj as f:
print(f)
class Foo(object):
def do_somthing(self):
pass
def close(self):
pass
class Context:
def __enter__(self):
self.data = Foo()
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
self.data.close()
with Context() as ctx:
ctx.do_somthing()