封裝redis
import redis
# r = redis.Redis()
class MyRedis():
def __init__(self,ip,password,port=6379,db=0):
#構造函數 類在實例化的時候會執行。
try:
self.r = redis.Redis(host=ip,password=password,port=port,db=db)
# r = redis.ConnectionPool(host=ip,password=password,port=port,db=db)
except Exception as e:
print(‘redis連接失敗,錯誤信息%s‘%e)
def str_get(self,k):
res = self.r.get(k)
if res:
return res.decode() #bytes類型轉化為字符串。
def str_set(self,k,v,time=None):
self.r.set(k,v,time) # 不出異常不需要
def delete(self,k):
tag = self.r.exists(k) # r.exists 用來判斷這個key是否存在
if tag:
self.r.delete(k)
print(‘刪除成功‘)
else:
print(‘這個key不存在‘)
def hash_get(self,name,k):
res = self.r.hget(name,k)
if res:
return res.decode()
def hash_set(self,name,k,v):
self.r.hset(name,k,v)
def hash_getall(self,name):
data = {}
# {b‘12‘: b‘1212‘, b‘3‘: b‘sdad‘, b‘4‘: b‘asdadsa‘}
res = self.r.hgetall(name) #獲取的結果是一個字典
if res:
for k,v in res.items():
k = k.decode() #對於返回的結果轉化為字符串
v = v.decode()
data[k]=v #加入字典
return data
def hash_del(self,name,k): #嘗試刪除不存在 不報錯 ,所以不需要try
res = self.r.hdel(name,k)
if res:
print(‘刪除成功‘)
return 1 #刪除成功會返回1 返回給該函數。
else:
print(‘刪除失敗,該key不存在‘)
return 0
@property #屬性方法 可以a.clean_redis直接調用
def clean_redis(self):
self.r.flushdb() #清空redis 使用 flushdb()用來清空庫
print(‘清空redis成功!‘)
return 0
####################################
my = MyRedis(‘118.24.xxx‘,‘HK139232‘) #不會報錯 try方法沒效果,其實有一個 redis.ConnectionPool 的方法可以???找一下
my.str_get(‘sss‘)
封裝redis