1. 程式人生 > >封裝redis

封裝redis

set sts rop del pass 屬性方法 one 調用 exist

import redis

# r = redis.Redis()

class MyRedis():

def __init__(self,ip,password,port=6379,db=0):

#構造函數 類在實例化的時候會執行。

try:

self.r = redis.Redis(host=ip,password=password,port=port,db=db)

# r = redis.ConnectionPool(host=ip,password=password,port=port,db=db)

except Exception as e:

print(‘redis連接失敗,錯誤信息%s‘%e)

def str_get(self,k):

res = self.r.get(k)

if res:

return res.decode() #bytes類型轉化為字符串。

def str_set(self,k,v,time=None):

self.r.set(k,v,time) # 不出異常不需要

def delete(self,k):

tag = self.r.exists(k) # r.exists 用來判斷這個key是否存在

if tag:

self.r.delete(k)

print(‘刪除成功‘)

else:

print(‘這個key不存在‘)

def hash_get(self,name,k):

res = self.r.hget(name,k)

if res:

return res.decode()

def hash_set(self,name,k,v):

self.r.hset(name,k,v)

def hash_getall(self,name):

data = {}

# {b‘12‘: b‘1212‘, b‘3‘: b‘sdad‘, b‘4‘: b‘asdadsa‘}

res = self.r.hgetall(name) #獲取的結果是一個字典

if res:

for k,v in res.items():

k = k.decode() #對於返回的結果轉化為字符串

v = v.decode()

data[k]=v #加入字典

return data

def hash_del(self,name,k): #嘗試刪除不存在 不報錯 ,所以不需要try

res = self.r.hdel(name,k)

if res:

print(‘刪除成功‘)

return 1 #刪除成功會返回1 返回給該函數。

else:

print(‘刪除失敗,該key不存在‘)

return 0

@property #屬性方法 可以a.clean_redis直接調用

def clean_redis(self):

self.r.flushdb() #清空redis 使用 flushdb()用來清空庫

print(‘清空redis成功!‘)

return 0

####################################

my = MyRedis(‘118.24.xxx‘,‘HK139232‘) #不會報錯 try方法沒效果,其實有一個 redis.ConnectionPool 的方法可以???找一下

my.str_get(‘sss‘)

封裝redis