1. 程式人生 > >Redis和MySQL命令封裝,連線池以及特殊場景下的封裝

Redis和MySQL命令封裝,連線池以及特殊場景下的封裝

由於專案需求以及模組化的需要,此次初版中簡單實現了MySQL的封裝的升級版,和redis與MySQL的互動,不是很成熟先寫出來慢慢改。

配置:

import hashlib
import redis
import pymysql
import time
import json
from DBUtils.PooledDB import PooledDB
"""
利用redis的集合不允許新增重複元素來進行去重
"""
b = {"user": "root",
     "passwd": "Lohas123",
     "host": "121.40.52.101",
     "db": 'price',
     "port": 3306,
     "charset": 'utf8'
     }
class Sql_all:
    __mysql_pool=None
    __redis=None
    #redis連線池初始化配置
    def __init__(self):
        self.my_coon = Sql_all.getmysqlconn()
        self.my_cur = self.my_coon.cursor(cursor=pymysql.cursors.DictCursor)
        self.re_con=Sql_all.getredisconn()

    # mysql連線池
    @staticmethod
    def getmysqlconn():
        if Sql_all.__mysql_pool is None:
            __mysql_pool = PooledDB(creator=pymysql, mincached=3, maxcached=20, host=b["host"], user=b["user"],
                              passwd=b["passwd"], db=b["db"], port=b["port"], charset=b["charset"])
        return __mysql_pool.connection()
    
    #redis連線池
    @staticmethod
    def getredisconn():
        if Sql_all.__redis is None:
            r= redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)#,password='Lohas3520'
            __redis_pool = redis.StrictRedis(connection_pool=r)
        return __redis_pool

MySQL封裝

#Mysql insert/update/delete
    def mysql_insert(self, sql='',value=(),insert_statue=1,_id=None):
        """
        實現資料庫insert/update/delete
        :param _id就是在只插入一個的情況下返回一個id並改變_id值,不用傳值預設None,
        :param insert_statue: 插入狀態int值,1為單詞插入一個,2為插入多個值,三或以上為刪除或修改模式,預設為1
        :param sql: 插入的sql格式
        :param value: 當插入一個值時為tuple/list,多個值時為tuple(tuple)/list[list],非插入狀態下value可為()
        :return:
        """
        try:
            print('op_insert',sql)
            if insert_statue==1:
                self.my_cur.execute(sql,value)
                _id = self.my_cur.lastrowid#獲取插入的id
            elif insert_statue==1:
                self.my_cur.executemany(sql,value)
            else:
                self.my_cur.execute(sql, value)
            self.my_coon.commit()
            self.dispose()
            if _id == 0:
                return True
            return True,_id
        except Exception as e:
            print('insert/update/delete except   ', e.args)
            self.my_coon.rollback()
            self.dispose()
            return 0

    #Mysql查詢
    def mysql_select(self,sql='',select_satue=2,num=None,param=None):
        """
        實現MySQL資料庫取的功能
        :param sql:sql查詢語句如有體條件,把條件傳入param
        :param param:可選引數,條件列表值(元祖/列表),預設None
        :param num:當只需要查詢一部分時候,預設None
        :param select_satue,可選狀態int型別(或能被轉成int得值),1為查詢一條
                   2為查詢所有,3或其他為查詢一部分(三種狀態都是在指定條件下
                   返回的數量)此時需傳入引數num,預設2查詢所有
        :return:
        """
        try:
            if param is None:
                count=self.my_cur.execute(sql)  # 執行sql
            else:
                count =self.my_cur.execute(sql,param)
            if count >0:
                if int(select_satue)==2:
                    select_res = self.my_cur.fetchall()# 返回結果為字典
                elif int(select_satue)==1:
                    select_res=self.my_cur.fetchone()
                else:
                    select_res=self.my_cur.fetchmany(num)
            else:
                select_res=False
            self.dispose()
            return select_res
        except Exception as e:
            print('select except ',e.args)
            self.dispose()
            return None
            
    #釋放資源
    def dispose(self):
        self.my_coon.close()
        self.my_cur.close()

redis操作

#redis壓縮url
    def compressed(self,value):
        sha1obj = hashlib.sha1()
        sha1obj.update(value.encode('utf-8'))
        hash_value = sha1obj.hexdigest()
        return hash_value

    #redis操作(需要被管道監控的前提),0模式為長度對比去重(利用redis管道特性
          鎖住url或關鍵字進行去重,在把資料持久化到MySQL),1模式為分頁計數
          (先利用redis管道鎖住page在用MySQL分頁)
    def redis_operation(self,key,sql_statue,value=None):
        """
        :param sql_statue操作redis的模式
        :param r:redis連線
        :param value:add的資料或者新增的頁碼數,先壓縮在新增
        :param key:專案所使用的集合名稱,建議如下格式:
                    ”projectname:task_remove_repeate“
        :return:
        """
        pageindexs=0
        while True:
            try:
                pipe = self.getredisconn().pipeline()
                pipe.watch(key)
                # 開啟事務
                pipe.multi()
                #去重模式,長度對比去重
                if sql_statue==0:
                    hash_value = self.compressed(value)
                    long_1 = self.re_con.scard(key)
                    self.re_con.sadd(key, hash_value)
                    long_2 = self.re_con.scard(key)
                    return long_1, long_2
                #翻頁模式
                elif sql_statue==1:
                    pageindex=self.re_con.get(key)
                    pageindexs += int(value)
                    self.re_con.set(key,pageindexs)
                    return pageindex
            except Exception as e:
                print(e)
                continue
    #新增到redis中,資料需要去重的情況使用set比較好
    #def add_redis(self,url, set_name,r=None):
       # r.sadd(set_name,url)

redis資料到MySQL

    #實時取資料到mysql,超過30分鐘退出(此狀態是資料下載完畢才會有)這個要寫到爬蟲中去。還沒想好如何和上面insert部分結合
    def redis_put(self,key):
        num=0
        while True:
            if num>=600:
                break
            if self.re_con.scard(key)>0:
                num=0
                h =self.re_con.spop(key)
                try:
                    result=json.loads(str(h,encoding='utf-8').replace("'", "\""))
                except Exception as e:
                    print(e)
                    self.error_file(h)
                    continue
            else:
                num+=1
                time.sleep(5)

    # #上文json.loads()處理時未明原因報錯,此錯誤線下實驗無問題,此處處理異常跳過,收集出錯項。###錯誤待考證
    def error_file(self,h):
        with open('elong_error_file.txt','w',encoding="utf-8") as f:
            f.write(str(h,encoding='utf-8').replace("'", "\""))

    def redis_close(self,pool):
        """
        釋放redis連線池
        :param pool:
        :return:
        """
        pool.disconnect()

由於此篇文章還沒寫完,目標了也是封裝程一個sql元件,當然可能會有些雞肋,畢竟還有ORM 這個東西,在加上實際上很多東西是已有的,此處了只是想看能不能滿足工作需求。不完善的地方請多多指教