1. 程式人生 > >python3+ 連線並操作mysql 資料庫,支援讀寫分離

python3+ 連線並操作mysql 資料庫,支援讀寫分離

最近,看了關於的一篇帖子,關於python的,決定花幾天看下python的基本知識。看了基礎知識後,發現記不住,就隨手寫了點程式碼,加深自己的記憶。

暫時主要寫了對資料庫的操作,其他的都還沒有做,後續會逐步完善,程式碼寫的很基礎,裡面肯定還存在很多問題,自己儲存起,怕後面丟失了。

主要是 pyMysql庫 實現了對mysql資料庫的操作,目前支援讀寫分離。


1、app_db.py對資料庫的配置。 支援讀寫分離。

def dbconfig() :
    config = dict();
    config['master'] = {
        "write" : {
             'host' : 'localhost',
             'dbuser' : 'root',
             'dbpwd' : '',
             'port' : 3306,
             'dbname' : 'liuyan',
             'dbprefix' : 'ly_',
        },
        "read" : {
            'host': 'localhost',
            'dbuser': 'root',
            'dbpwd': '',
            'port': 3306,
            'dbname': 'liuyan',
            'dbprefix': 'ly_',
        }
    };

    config['server'] = {
        "write": {
            'host': 'localhost',
            'dbuser': 'root',
            'dbpwd': '',
            'port': 3306,
            'dbname': 'liuyanben',
            'dbprefix': 't_',
        },
        "read": {
            'host': 'localhost',
            'dbuser': 'root',
            'dbpwd': '',
            'port': 3306,
            'dbname': 'liuyanben',
            'dbprefix': 't_',
        }
    };
    return config;

2、app_db.py 實現對資料庫的操作,支援讀寫分離。
#@author  SteveGao


import sys;
import pymysql;
import random;
from config.app_db import dbconfig;

class DB :
    link = dict();
    _res = '';
    _mode = "read";# write,read
    _dbconfig = dbconfig();
    _host = '';
    def __init__(self,host = "master"):
        self._host = host;

    def connect(self,mode = "read"):
        try :
            self._mode = mode;
            self._dbconfig = self._dbconfig[self._host][self._mode];
            self.host = self._dbconfig['host'];
            self.dbuser = self._dbconfig['dbuser'];
            self.password = self._dbconfig['dbpwd'];
            self.dbname = self._dbconfig['dbname'];
            self.port = self._dbconfig['port'];
            self._res = pymysql.cursors.DictCursor;
            self.link[self._host] = pymysql.connect(self.host,self.dbuser,self.password,self.dbname,self.port);
        except Exception as err:
            print("DBERR資料庫連線失敗: \n[%s]" %(err));
            exit();

    def getTableName(self,tableName):
        return self._dbconfig['dbprefix'] + tableName;


    '''
    查詢單張表的一條記錄
    '''
    def getOne(self,tableName,id):
        try :
            self.connect('read');
            tableName = self.getTableName(tableName);
            cursors = self.link[self._host].cursor(self._res);
            sql = "SELECT * FROM %s WHERE id = %d LIMIT 1";
            cursors.execute(sql %(tableName,int(id)));
            result = cursors.fetchone();
            return result;
        except Exception as err:
            print("DBERR資料庫查詢單條記錄失敗: \n[%s]" % (err));
            exit();

    '''
    根據條件查詢單表的內容
    '''
    def getCondition(self,tableName, condition = ''):
        try :
            self.connect('read');
            cursors = self.link[self._host].cursor(self._res);
            tableName = self.getTableName(tableName);
            if (condition):
                sql = "SELECT * FROM %s WHERE %s";
                cursors.execute(sql % (tableName, condition));
            else :
                sql = "SELECT * FROM %s;";
                cursors.execute(sql % (tableName));

            result = cursors.fetchall();
            result = list(result);
            return result;
        except Exception as err:
            print("DBERR資料庫按條件查詢記錄失敗: \n[%s]" % (err));
            exit();


    '''
    根據SQL語句查詢
    '''
    def executeSql(self,sql,findOne = 'm'):
        try :
            sql = sql.lstrip();
            if (len(sql) == 0):
                return False;

            operate = sql[0:6].upper();
            if (operate == "SELECT"):
                self._mode = 'read';
            elif (operate == 'INSERT' or operate == "UPDATE" or operate == 'DELETE') :
                self._mode = 'write';
            else :
                self._mode = 'write';

            self.connect(self._mode);
            cursors = self.link[self._host].cursor(self._res);

            execs = cursors.execute(sql);
            if (operate == "SELECT") :
                if ((findOne == 'm') and ("LIMIT 1" not in sql.upper())) :
                    result = cursors.fetchall();
                else :
                    result = cursors.fetchone();
                return result;
            else :
                return execs;

        except Exception as err:
            print("DBERR資料庫執行失敗: \n[%s]" % (err));
            exit();

    '''
    使用字典插入單條資料
    link = DB('localhost','root','','liuyan');
    data = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"};
    link.insertAll('ly_user',data);
    '''
    def insert(self,table,data):
        try :
            self.connect('write');
            table = self.getTableName(table);
            tinsert = "INSERT INTO %s";
            fieldList = ' (';
            valueList = ' (';
            for field in data :
                fieldList += "`" + field + "`,";
                valueList += '"' + data[field] + '",';
            fieldList = fieldList[0:-1] + ") ";
            valueList = valueList[0:-1] + ") ";
            sql = tinsert + fieldList + "values " + valueList;
            cursors = self.link[self._host].cursor(self._res);
            insert = cursors.execute(sql %(table));
            print(sql %(table));
            #self.link.commit();
            return insert;
        except Exception as err :
            self.link.rollback();
            print("DBERR資料庫插入單條記錄失敗:\n[%s]" % (err));
            exit();


    '''
    使用字典,批量插入資料到資料庫
    link = DB('localhost','root','','liuyan');
    data = dict();
    data[0] = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"};
    data[1] = {"user_name":"jennygao","password" : "555555",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"};
    ii = link.insertAll('ly_user',data);
    '''

    def insertAll(self,table,data):
        try :
            self.connect('write');
            table = self.getTableName(table);
            tinsert = "INSERT INTO %s";
            fieldList = ' (';
            values = '';
            oneField = data[0];
            for field in oneField:
                fieldList += "`" + field + "`,";
            fieldList = fieldList[0:-1] + ") ";

            for row in data :
                valueList = '(';
                for field in data[row] :
                    valueList += '"' + data[row][field] + '",';
                values += valueList[0:-1] + "),";


            sql = tinsert + fieldList + "values " + values[0:-1];
            cursors = self.link[self._host].cursor(self._res);
            insert = cursors.execute(sql %(table));
            #self.link.commit();
            return insert;
        except Exception as err:
            self.link.rollback();
            print("DBERR資料庫批量插入記錄失敗:\n[%s]" % (err));
            exit();

    def commit(self):
        self.link[self._host].commit();

    def rollback(self):
        self.link[self._host].rollback();

    '''
    關閉資料庫連結
    '''
    def close(self):
        self.link[self._host].close();




3、app_model.py 支援鏈式操作,讓開發變得 簡單。
#@author  SteveGao

from gyk.database.app_db import DB;
from config.app_db import dbconfig;



'''
向資料庫插入單條資料
model = Model();
data = {"user_name":"stevegao","password" : "444444",'create_time':"2017-02-12 09:12:12", 'login_times' : "1"};
insert = model.table('ly_user').insert(data);
if (insert) :
    model.db.link.commit(); //提交事務
else :
    model.db.link.rollback(); //回滾事務
'''

class Model :
    lastsql = '';
    _wherestr = '';
    _orderstr = '';
    _groupstr = '';
    _limitstr = '';
    _fieldstr = '';
    _tablename = '';
    _having = '';
    _host = "";

    db = {};

    def __init__(self,host = "master"):

        self._host = host;
        mysql = DB(host.lower()); #例項化資料庫類
        self.db[self._host] = mysql;

    def max(self,field):
        sql = self._parseStatis('max',field);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    def min(self,field):
        sql = self._parseStatis('min',field);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    def avg(self,field):
        sql = self._parseStatis('avg',field);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    def count(self,field):
        sql = self._parseStatis('count',field);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;


    def sum(self,field):
        sql = self._parseStatis('sum',field);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;


    def table(self,tablename):
        pconfig = dbconfig();
        prefix = pconfig[self._host]['write']['dbprefix'];
        self._tablename = "`" + prefix + tablename + "`";
        return self;

    def where(self,where = ""):
        if (where) :
            self._wherestr = self._parseWhereDict(where);
        else :
            self._wherestr = where;
        return self;

    def orderBy(self,order = ""):
        self._orderstr = order;
        return self;

    def groupBy(self,groupby = ""):
        self._groupstr = groupby;
        return self;

    def having(self,having = ""):
        self._having = having;
        return self;

    def limit(self,limit = ""):
        self._limitstr = limit;
        return self;

    def field(self,field = "*"):
        self._fieldstr = field;
        return self;

    def select(self):
        sql = self._parseQuery();
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    def find(self):
        sql = self._parseQuery(1);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql,'s');
        return result;

    def query(self,sql):
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    '''
    插入單條資料,data型別為字典型
    '''
    def insert(self,data):
        sql = self._parseInsert(data);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    '''
    批量插入資料,data型別為字典型
    '''
    def insertAll(self,data):
        sql = self._parseInsertAll(data);
        if (sql == False) :
            return sql;
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql);
        return result;

    def delete(self):
        sql = self._parseDelete();
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql, 's');
        return result;

    def update(self,data):
        sql = self._parseSetDict(data);
        self.lastsql = sql;
        result = self.db[self._host].executeSql(sql, 's');
        return result;

    '''
    提交事務
    '''

    def commit(self):
        self.db[self._host].commit();

    '''
    提交回滾
    '''

    def rollback(self):
        self.db[self._host].rollback();

    '''
    解析查詢語句
    '''
    def _parseQuery(self,limit = 2):
        tempsql = "";
        tempsql = "SELECT ";
        if (self._fieldstr) :
            tempsql += self._fieldstr;
        else :
            tempsql += "*";
        tempsql += " FROM ";

        if (self._tablename) :
            tempsql += self._tablename;

        if (self._wherestr):
            tempsql += " WHERE ";
            tempsql += self._wherestr;

        tempsql += " ";

        if (self._groupstr):
            tempsql += " GROUP BY ";
            tempsql += self._groupstr;

        if (self._having) :
            tempsql += " HAVING ";
            tempsql += self._having;

        if (self._orderstr):
            tempsql += " ORDER BY ";
            tempsql += self._orderstr;
        tempsql += " ";

        if (limit == 2) :
            if (self._limitstr):
                tempsql += " LIMIT ";
                tempsql += self._limitstr;
        else :
            tempsql += " LIMIT 1";

        return tempsql;


    def _parseInsert(self,data):
        tinsert = "";
        tinsert += "INSERT INTO ";

        if (self._tablename) :
            tinsert += self._tablename;

        fieldList = ' (';
        valueList = ' (';
        for field in data:
            fieldList += "`" + field + "`,";
            valueList += '"' + data[field] + '",';
        fieldList = fieldList[0:-1] + ") ";
        valueList = valueList[0:-1] + ") ";
        sql = tinsert + fieldList + "values " + valueList;
        return sql;

    def _parseInsertAll(self,data):
        if (data.get(0) == None) :
            return False;
        tinsert = "";
        tinsert += "INSERT INTO ";
        if (self._tablename) :
            tinsert += "`" +self._tablename + "`";
        fieldList = ' (';
        values = '';
        oneField = data[0];
        for field in oneField:
            fieldList += "`" + field + "`,";
        fieldList = fieldList[0:-1] + ") ";

        for row in data:
            valueList = '(';
            for field in data[row]:
                valueList += '"' + data[row][field] + '",';
            values += valueList[0:-1] + "),";

        sql = tinsert + fieldList + "VALUES " + values[0:-1];
        return sql;

    '''
        解析查詢語句
        '''

    def _parseStatis(self,func,field):
        tempsql = "";
        tempsql = "SELECT ";
        if (field):
            tempsql += func +"(" + field +")";
        else:
            tempsql += func +"(" + 1 +")";
        tempsql += " FROM ";

        if (self._tablename):
            tempsql += self._tablename;

        if (self._wherestr):
            tempsql += " WHERE ";
            tempsql += self._wherestr;

        tempsql += " ";

        if (self._groupstr):
            tempsql += " GROUP BY ";
            tempsql += self._groupstr;

        if (self._having) :
            tempsql += " HAVING ";
            tempsql += self._having;

        if (self._orderstr):
            tempsql += " ORDER BY ";
            tempsql += self._orderstr;
        tempsql += " ";
        if (self._groupstr == "") :
            tempsql += " LIMIT 1";

        return tempsql;


    def _parseDelete(self):

        tempsql = "";
        tempsql = "DELETE ";
        tempsql += " FROM ";

        if (self._tablename):
            tempsql += self._tablename;

        if (self._wherestr):
            tempsql += " WHERE ";
            tempsql += self._wherestr;

        tempsql += " ";

        if (self._orderstr):
            tempsql += " ORDER BY ";
            tempsql += self._orderstr;
        tempsql += " ";

        if (self._limitstr):
            tempsql += " LIMIT ";
            tempsql += self._limitstr;

        return tempsql;

    def _parseUpdate(self,data):

        if (not data) :
            return False;

        tempsql = "";
        tempsql = "UPDATE ";

        if (self._tablename):
            tempsql += self._tablename;

        tempsql += " SET ";

        tempsql += self._parseDict(data);

        if (self._wherestr):
            tempsql += " WHERE ";
            tempsql += self._wherestr;

        tempsql += " ";

        if (self._orderstr):
            tempsql += " ORDER BY ";
            tempsql += self._orderstr;
        tempsql += " ";

        if (self._limitstr):
            tempsql += " LIMIT ";
            tempsql += self._limitstr;

        return tempsql;

    def _parseSetDict(self,data):
        result = "";
        if (isinstance(data, dict)):
            temp = "";
            for field in data :
                temp += '`' + field + '` = "' + data[field] + '",';
            result = temp[0:-1];
        else :
            result = data;
        return  result;

    def _parseWhereDict(self,data):
        result = "";
        if (isinstance(data, dict)):
            temp = "";
            for field in data :
                temp += '`' + field + '` = "' + data[field] + '" AND ';
            result = temp.rstrip()[0:-3];
        else :
            result = data;
        return  result;