python3+ 連線並操作mysql 資料庫,支援讀寫分離
阿新 • • 發佈:2019-02-06
最近,看了關於的一篇帖子,關於python的,決定花幾天看下python的基本知識。看了基礎知識後,發現記不住,就隨手寫了點程式碼,加深自己的記憶。
暫時主要寫了對資料庫的操作,其他的都還沒有做,後續會逐步完善,程式碼寫的很基礎,裡面肯定還存在很多問題,自己儲存起,怕後面丟失了。
主要是 pyMysql庫 實現了對mysql資料庫的操作,目前支援讀寫分離。
1、app_db.py對資料庫的配置。 支援讀寫分離。
def dbconfig() : config = dict(); config['master'] = { "write" : { 'host' : 'localhost', 'dbuser' : 'root', 'dbpwd' : '', 'port' : 3306, 'dbname' : 'liuyan', 'dbprefix' : 'ly_', }, "read" : { 'host': 'localhost', 'dbuser': 'root', 'dbpwd': '', 'port': 3306, 'dbname': 'liuyan', 'dbprefix': 'ly_', } }; config['server'] = { "write": { 'host': 'localhost', 'dbuser': 'root', 'dbpwd': '', 'port': 3306, 'dbname': 'liuyanben', 'dbprefix': 't_', }, "read": { 'host': 'localhost', 'dbuser': 'root', 'dbpwd': '', 'port': 3306, 'dbname': 'liuyanben', 'dbprefix': 't_', } }; return config;
2、app_db.py 實現對資料庫的操作,支援讀寫分離。
3、app_model.py 支援鏈式操作,讓開發變得 簡單。#@author SteveGao import sys; import pymysql; import random; from config.app_db import dbconfig; class DB : link = dict(); _res = ''; _mode = "read";# write,read _dbconfig = dbconfig(); _host = ''; def __init__(self,host = "master"): self._host = host; def connect(self,mode = "read"): try : self._mode = mode; self._dbconfig = self._dbconfig[self._host][self._mode]; self.host = self._dbconfig['host']; self.dbuser = self._dbconfig['dbuser']; self.password = self._dbconfig['dbpwd']; self.dbname = self._dbconfig['dbname']; self.port = self._dbconfig['port']; self._res = pymysql.cursors.DictCursor; self.link[self._host] = pymysql.connect(self.host,self.dbuser,self.password,self.dbname,self.port); except Exception as err: print("DBERR資料庫連線失敗: \n[%s]" %(err)); exit(); def getTableName(self,tableName): return self._dbconfig['dbprefix'] + tableName; ''' 查詢單張表的一條記錄 ''' def getOne(self,tableName,id): try : self.connect('read'); tableName = self.getTableName(tableName); cursors = self.link[self._host].cursor(self._res); sql = "SELECT * FROM %s WHERE id = %d LIMIT 1"; cursors.execute(sql %(tableName,int(id))); result = cursors.fetchone(); return result; except Exception as err: print("DBERR資料庫查詢單條記錄失敗: \n[%s]" % (err)); exit(); ''' 根據條件查詢單表的內容 ''' def getCondition(self,tableName, condition = ''): try : self.connect('read'); cursors = self.link[self._host].cursor(self._res); tableName = self.getTableName(tableName); if (condition): sql = "SELECT * FROM %s WHERE %s"; cursors.execute(sql % (tableName, condition)); else : sql = "SELECT * FROM %s;"; cursors.execute(sql % (tableName)); result = cursors.fetchall(); result = list(result); return result; except Exception as err: print("DBERR資料庫按條件查詢記錄失敗: \n[%s]" % (err)); exit(); ''' 根據SQL語句查詢 ''' def executeSql(self,sql,findOne = 'm'): try : sql = sql.lstrip(); if (len(sql) == 0): return False; operate = sql[0:6].upper(); if (operate == "SELECT"): self._mode = 'read'; elif (operate == 'INSERT' or operate == "UPDATE" or operate == 'DELETE') : self._mode = 'write'; else : self._mode = 'write'; self.connect(self._mode); cursors = self.link[self._host].cursor(self._res); execs = cursors.execute(sql); if (operate == "SELECT") : if ((findOne == 'm') and ("LIMIT 1" not in sql.upper())) : result = cursors.fetchall(); else : result = cursors.fetchone(); return result; else : return execs; except Exception as err: print("DBERR資料庫執行失敗: \n[%s]" % (err)); exit(); ''' 使用字典插入單條資料 link = DB('localhost','root','','liuyan'); data = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"}; link.insertAll('ly_user',data); ''' def insert(self,table,data): try : self.connect('write'); table = self.getTableName(table); tinsert = "INSERT INTO %s"; fieldList = ' ('; valueList = ' ('; for field in data : fieldList += "`" + field + "`,"; valueList += '"' + data[field] + '",'; fieldList = fieldList[0:-1] + ") "; valueList = valueList[0:-1] + ") "; sql = tinsert + fieldList + "values " + valueList; cursors = self.link[self._host].cursor(self._res); insert = cursors.execute(sql %(table)); print(sql %(table)); #self.link.commit(); return insert; except Exception as err : self.link.rollback(); print("DBERR資料庫插入單條記錄失敗:\n[%s]" % (err)); exit(); ''' 使用字典,批量插入資料到資料庫 link = DB('localhost','root','','liuyan'); data = dict(); data[0] = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"}; data[1] = {"user_name":"jennygao","password" : "555555",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"}; ii = link.insertAll('ly_user',data); ''' def insertAll(self,table,data): try : self.connect('write'); table = self.getTableName(table); tinsert = "INSERT INTO %s"; fieldList = ' ('; values = ''; oneField = data[0]; for field in oneField: fieldList += "`" + field + "`,"; fieldList = fieldList[0:-1] + ") "; for row in data : valueList = '('; for field in data[row] : valueList += '"' + data[row][field] + '",'; values += valueList[0:-1] + "),"; sql = tinsert + fieldList + "values " + values[0:-1]; cursors = self.link[self._host].cursor(self._res); insert = cursors.execute(sql %(table)); #self.link.commit(); return insert; except Exception as err: self.link.rollback(); print("DBERR資料庫批量插入記錄失敗:\n[%s]" % (err)); exit(); def commit(self): self.link[self._host].commit(); def rollback(self): self.link[self._host].rollback(); ''' 關閉資料庫連結 ''' def close(self): self.link[self._host].close();
#@author SteveGao from gyk.database.app_db import DB; from config.app_db import dbconfig; ''' 向資料庫插入單條資料 model = Model(); data = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"}; insert = model.table('ly_user').insert(data); if (insert) : model.db.link.commit(); //提交事務 else : model.db.link.rollback(); //回滾事務 ''' class Model : lastsql = ''; _wherestr = ''; _orderstr = ''; _groupstr = ''; _limitstr = ''; _fieldstr = ''; _tablename = ''; _having = ''; _host = ""; db = {}; def __init__(self,host = "master"): self._host = host; mysql = DB(host.lower()); #例項化資料庫類 self.db[self._host] = mysql; def max(self,field): sql = self._parseStatis('max',field); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def min(self,field): sql = self._parseStatis('min',field); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def avg(self,field): sql = self._parseStatis('avg',field); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def count(self,field): sql = self._parseStatis('count',field); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def sum(self,field): sql = self._parseStatis('sum',field); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def table(self,tablename): pconfig = dbconfig(); prefix = pconfig[self._host]['write']['dbprefix']; self._tablename = "`" + prefix + tablename + "`"; return self; def where(self,where = ""): if (where) : self._wherestr = self._parseWhereDict(where); else : self._wherestr = where; return self; def orderBy(self,order = ""): self._orderstr = order; return self; def groupBy(self,groupby = ""): self._groupstr = groupby; return self; def having(self,having = ""): self._having = having; return self; def limit(self,limit = ""): self._limitstr = limit; return self; def field(self,field = "*"): self._fieldstr = field; return self; def select(self): sql = self._parseQuery(); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def find(self): sql = self._parseQuery(1); self.lastsql = sql; result = self.db[self._host].executeSql(sql,'s'); return result; def query(self,sql): self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; ''' 插入單條資料,data型別為字典型 ''' def insert(self,data): sql = self._parseInsert(data); self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; ''' 批量插入資料,data型別為字典型 ''' def insertAll(self,data): sql = self._parseInsertAll(data); if (sql == False) : return sql; self.lastsql = sql; result = self.db[self._host].executeSql(sql); return result; def delete(self): sql = self._parseDelete(); self.lastsql = sql; result = self.db[self._host].executeSql(sql, 's'); return result; def update(self,data): sql = self._parseSetDict(data); self.lastsql = sql; result = self.db[self._host].executeSql(sql, 's'); return result; ''' 提交事務 ''' def commit(self): self.db[self._host].commit(); ''' 提交回滾 ''' def rollback(self): self.db[self._host].rollback(); ''' 解析查詢語句 ''' def _parseQuery(self,limit = 2): tempsql = ""; tempsql = "SELECT "; if (self._fieldstr) : tempsql += self._fieldstr; else : tempsql += "*"; tempsql += " FROM "; if (self._tablename) : tempsql += self._tablename; if (self._wherestr): tempsql += " WHERE "; tempsql += self._wherestr; tempsql += " "; if (self._groupstr): tempsql += " GROUP BY "; tempsql += self._groupstr; if (self._having) : tempsql += " HAVING "; tempsql += self._having; if (self._orderstr): tempsql += " ORDER BY "; tempsql += self._orderstr; tempsql += " "; if (limit == 2) : if (self._limitstr): tempsql += " LIMIT "; tempsql += self._limitstr; else : tempsql += " LIMIT 1"; return tempsql; def _parseInsert(self,data): tinsert = ""; tinsert += "INSERT INTO "; if (self._tablename) : tinsert += self._tablename; fieldList = ' ('; valueList = ' ('; for field in data: fieldList += "`" + field + "`,"; valueList += '"' + data[field] + '",'; fieldList = fieldList[0:-1] + ") "; valueList = valueList[0:-1] + ") "; sql = tinsert + fieldList + "values " + valueList; return sql; def _parseInsertAll(self,data): if (data.get(0) == None) : return False; tinsert = ""; tinsert += "INSERT INTO "; if (self._tablename) : tinsert += "`" +self._tablename + "`"; fieldList = ' ('; values = ''; oneField = data[0]; for field in oneField: fieldList += "`" + field + "`,"; fieldList = fieldList[0:-1] + ") "; for row in data: valueList = '('; for field in data[row]: valueList += '"' + data[row][field] + '",'; values += valueList[0:-1] + "),"; sql = tinsert + fieldList + "VALUES " + values[0:-1]; return sql; ''' 解析查詢語句 ''' def _parseStatis(self,func,field): tempsql = ""; tempsql = "SELECT "; if (field): tempsql += func +"(" + field +")"; else: tempsql += func +"(" + 1 +")"; tempsql += " FROM "; if (self._tablename): tempsql += self._tablename; if (self._wherestr): tempsql += " WHERE "; tempsql += self._wherestr; tempsql += " "; if (self._groupstr): tempsql += " GROUP BY "; tempsql += self._groupstr; if (self._having) : tempsql += " HAVING "; tempsql += self._having; if (self._orderstr): tempsql += " ORDER BY "; tempsql += self._orderstr; tempsql += " "; if (self._groupstr == "") : tempsql += " LIMIT 1"; return tempsql; def _parseDelete(self): tempsql = ""; tempsql = "DELETE "; tempsql += " FROM "; if (self._tablename): tempsql += self._tablename; if (self._wherestr): tempsql += " WHERE "; tempsql += self._wherestr; tempsql += " "; if (self._orderstr): tempsql += " ORDER BY "; tempsql += self._orderstr; tempsql += " "; if (self._limitstr): tempsql += " LIMIT "; tempsql += self._limitstr; return tempsql; def _parseUpdate(self,data): if (not data) : return False; tempsql = ""; tempsql = "UPDATE "; if (self._tablename): tempsql += self._tablename; tempsql += " SET "; tempsql += self._parseDict(data); if (self._wherestr): tempsql += " WHERE "; tempsql += self._wherestr; tempsql += " "; if (self._orderstr): tempsql += " ORDER BY "; tempsql += self._orderstr; tempsql += " "; if (self._limitstr): tempsql += " LIMIT "; tempsql += self._limitstr; return tempsql; def _parseSetDict(self,data): result = ""; if (isinstance(data, dict)): temp = ""; for field in data : temp += '`' + field + '` = "' + data[field] + '",'; result = temp[0:-1]; else : result = data; return result; def _parseWhereDict(self,data): result = ""; if (isinstance(data, dict)): temp = ""; for field in data : temp += '`' + field + '` = "' + data[field] + '" AND '; result = temp.rstrip()[0:-3]; else : result = data; return result;