Redis和MySQL命令封裝,連線池以及特殊場景下的封裝
阿新 • • 發佈:2018-12-18
由於專案需求以及模組化的需要,此次初版中簡單實現了MySQL的封裝的升級版,和redis與MySQL的互動,不是很成熟先寫出來慢慢改。
配置:
import hashlib import redis import pymysql import time import json from DBUtils.PooledDB import PooledDB """ 利用redis的集合不允許新增重複元素來進行去重 """ b = {"user": "root", "passwd": "Lohas123", "host": "121.40.52.101", "db": 'price', "port": 3306, "charset": 'utf8' } class Sql_all: __mysql_pool=None __redis=None #redis連線池初始化配置 def __init__(self): self.my_coon = Sql_all.getmysqlconn() self.my_cur = self.my_coon.cursor(cursor=pymysql.cursors.DictCursor) self.re_con=Sql_all.getredisconn() # mysql連線池 @staticmethod def getmysqlconn(): if Sql_all.__mysql_pool is None: __mysql_pool = PooledDB(creator=pymysql, mincached=3, maxcached=20, host=b["host"], user=b["user"], passwd=b["passwd"], db=b["db"], port=b["port"], charset=b["charset"]) return __mysql_pool.connection() #redis連線池 @staticmethod def getredisconn(): if Sql_all.__redis is None: r= redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)#,password='Lohas3520' __redis_pool = redis.StrictRedis(connection_pool=r) return __redis_pool
MySQL封裝
#Mysql insert/update/delete def mysql_insert(self, sql='',value=(),insert_statue=1,_id=None): """ 實現資料庫insert/update/delete :param _id就是在只插入一個的情況下返回一個id並改變_id值,不用傳值預設None, :param insert_statue: 插入狀態int值,1為單詞插入一個,2為插入多個值,三或以上為刪除或修改模式,預設為1 :param sql: 插入的sql格式 :param value: 當插入一個值時為tuple/list,多個值時為tuple(tuple)/list[list],非插入狀態下value可為() :return: """ try: print('op_insert',sql) if insert_statue==1: self.my_cur.execute(sql,value) _id = self.my_cur.lastrowid#獲取插入的id elif insert_statue==1: self.my_cur.executemany(sql,value) else: self.my_cur.execute(sql, value) self.my_coon.commit() self.dispose() if _id == 0: return True return True,_id except Exception as e: print('insert/update/delete except ', e.args) self.my_coon.rollback() self.dispose() return 0 #Mysql查詢 def mysql_select(self,sql='',select_satue=2,num=None,param=None): """ 實現MySQL資料庫取的功能 :param sql:sql查詢語句如有體條件,把條件傳入param :param param:可選引數,條件列表值(元祖/列表),預設None :param num:當只需要查詢一部分時候,預設None :param select_satue,可選狀態int型別(或能被轉成int得值),1為查詢一條 2為查詢所有,3或其他為查詢一部分(三種狀態都是在指定條件下 返回的數量)此時需傳入引數num,預設2查詢所有 :return: """ try: if param is None: count=self.my_cur.execute(sql) # 執行sql else: count =self.my_cur.execute(sql,param) if count >0: if int(select_satue)==2: select_res = self.my_cur.fetchall()# 返回結果為字典 elif int(select_satue)==1: select_res=self.my_cur.fetchone() else: select_res=self.my_cur.fetchmany(num) else: select_res=False self.dispose() return select_res except Exception as e: print('select except ',e.args) self.dispose() return None #釋放資源 def dispose(self): self.my_coon.close() self.my_cur.close()
redis操作
#redis壓縮url def compressed(self,value): sha1obj = hashlib.sha1() sha1obj.update(value.encode('utf-8')) hash_value = sha1obj.hexdigest() return hash_value #redis操作(需要被管道監控的前提),0模式為長度對比去重(利用redis管道特性 鎖住url或關鍵字進行去重,在把資料持久化到MySQL),1模式為分頁計數 (先利用redis管道鎖住page在用MySQL分頁) def redis_operation(self,key,sql_statue,value=None): """ :param sql_statue操作redis的模式 :param r:redis連線 :param value:add的資料或者新增的頁碼數,先壓縮在新增 :param key:專案所使用的集合名稱,建議如下格式: ”projectname:task_remove_repeate“ :return: """ pageindexs=0 while True: try: pipe = self.getredisconn().pipeline() pipe.watch(key) # 開啟事務 pipe.multi() #去重模式,長度對比去重 if sql_statue==0: hash_value = self.compressed(value) long_1 = self.re_con.scard(key) self.re_con.sadd(key, hash_value) long_2 = self.re_con.scard(key) return long_1, long_2 #翻頁模式 elif sql_statue==1: pageindex=self.re_con.get(key) pageindexs += int(value) self.re_con.set(key,pageindexs) return pageindex except Exception as e: print(e) continue #新增到redis中,資料需要去重的情況使用set比較好 #def add_redis(self,url, set_name,r=None): # r.sadd(set_name,url)
redis資料到MySQL
#實時取資料到mysql,超過30分鐘退出(此狀態是資料下載完畢才會有)這個要寫到爬蟲中去。還沒想好如何和上面insert部分結合
def redis_put(self,key):
num=0
while True:
if num>=600:
break
if self.re_con.scard(key)>0:
num=0
h =self.re_con.spop(key)
try:
result=json.loads(str(h,encoding='utf-8').replace("'", "\""))
except Exception as e:
print(e)
self.error_file(h)
continue
else:
num+=1
time.sleep(5)
# #上文json.loads()處理時未明原因報錯,此錯誤線下實驗無問題,此處處理異常跳過,收集出錯項。###錯誤待考證
def error_file(self,h):
with open('elong_error_file.txt','w',encoding="utf-8") as f:
f.write(str(h,encoding='utf-8').replace("'", "\""))
def redis_close(self,pool):
"""
釋放redis連線池
:param pool:
:return:
"""
pool.disconnect()
由於此篇文章還沒寫完,目標了也是封裝程一個sql元件,當然可能會有些雞肋,畢竟還有ORM 這個東西,在加上實際上很多東西是已有的,此處了只是想看能不能滿足工作需求。不完善的地方請多多指教