Linux-04-賬號管理
阿新 • • 發佈:2020-09-19
簡單實現基於記憶體的快取引擎,並封裝第三方庫aredis實現redis快取
0.程式碼DEMO
""" 快取元件 """ import time import logging import asyncio from threading import Lock from typing import Union, Any, Dict from aredis import StrictRedis, StrictRedisCluster class SingletonMeta(type): """元類——有限的單例模式 當初始化引數包含new=True時,將構造一個新的物件 """ __instance = None __lock = Lock() def __call__(cls, *args, **kwargs): with cls.__lock: new = kwargs.pop('new', None) if new is True: return super().__call__(*args, **kwargs) if not cls.__instance: cls.__instance = super().__call__(*args, **kwargs) return cls.__instance class MemoryEngine: """本地記憶體作為後端快取引擎,不支援分散式 只支援get、set方法 """ def __init__(self): self.namespace = {} self._check_time = 0 self._check_interval = 60 def delete(self, key: str) -> None: """刪除指定快取""" if key in self.namespace: del self.namespace[key] def et_clear(self) -> None: """清理超時快取""" clear_names = [] if time.time() > self._check_time + self._check_interval: for name, block in self.namespace.items(): if block.ttl < -1: clear_names.append(name) for name in clear_names: del self.namespace[name] async def ttl(self, name) -> int: self.et_clear() if name not in self.namespace: return -1 return int(self.namespace[name].ttl) async def get(self, name): self.et_clear() if name not in self.namespace: return None return self.namespace[name].val async def set(self, name, value, ex=None, px=None, nx=False, xx=False): if nx and name in self.namespace: return if xx and name not in self.namespace: return self.namespace[name] = DataBlock(name, value, ex, px) if len(value) > 16384 and (ex or px): # 實驗性功能 大容量快取清理機制 避免長時間不使用快取下佔用記憶體 life = ex if ex else px // 1000 loop = asyncio.get_event_loop() loop.call_later(life * 2, self.delete, name) class DataBlock: """記憶體資料塊 封裝了有效期""" def __init__(self, name: str, value: Any, ex: float = None, px: float = None): """ :param name: key名 :param value: 儲存value :param ex: 生命週期,單位秒 :param px: 生命週期,單位毫秒 """ self._name = name self._value = value self.et = time.time() - 1 if ex: self.et += ex if px: self.et += (px / 1000) if not ex and not px: self._ttl = -1 @property def val(self): return self._value if self.ttl >= -1 else None @property def ttl(self): if hasattr(self, '_ttl'): return self._ttl return self.et - time.time() def __repr__(self): return f'<name={self._name}>' class Cache(metaclass=SingletonMeta): """一個基於redis封裝的非同步快取類,它可以快速方便切換多個快取庫 Cache類預設使用default快取庫,你可以使用select(db_name)切換其他庫,並且select支援 鏈式呼叫,但select方法並不會改變原物件指向的default快取庫 Cache物件通過反射擁有了StrictRedis和StrictRedisCluster類下的所有方法,你可以直接對 物件執行redis命令,此外Cache還封裝了一個方法execute(command, *args, **kwargs) 相比於反射方法,使用execute方法會自動對返回資料解碼 針對字串型別,Cache對get和set方法作了優化,當使用get和set方法時,可以同時傳遞一個序列化器, 它會查詢和儲存時自動使用序列化器,也就是說你可以使用set方法儲存任意序列化器支援的物件 """ logger = logging.getLogger(__name__) def __init__(self, config: dict): """ :param config: 快取資料庫字典 :return: Cache物件 """ self._default = 'default' self._caches = {} serializer = config.pop('serializer', 'ujson') try: self.serializer = __import__(serializer) except: self.serializer = __import__('json') for key, value in config.items(): try: if value.get('engine') == 'memory': self._caches[key] = MemoryEngine() elif 'startup_nodes' in value: self._caches[key] = StrictRedisCluster(**value) else: self._caches[key] = StrictRedis(**value) except Exception as e: self.logger.error(e) @property def all(self) -> Dict[str, Union[StrictRedis, StrictRedisCluster]]: """返回全部快取資料庫""" return self._caches @property def current_db(self) -> Union[StrictRedis, StrictRedisCluster]: """返回快取物件指向的快取資料庫""" return self._caches[self._default] def select(self, name: str = 'default') -> 'Cache': """獲取指定快取資料庫 支援多次鏈式呼叫select方法 永遠不會改變app所繫結的預設快取資料庫 :param name: 定義的資料庫名,預設值為"default" :return: Cache物件 """ if name not in self._caches: raise AttributeError(f'Cache database "{name}" not found. ' f'Please check CACHES config in settings') obj = Cache(config={}, new=True) obj._caches = self._caches obj._default = name return obj async def execute(self, command: str, *args, **kwargs) -> Any: """實現結果自解碼 :param command: 執行的redis原生命令 :return: 返回redis結果的utf8解碼 """ if hasattr(StrictRedis, command): result = await getattr(self, command)(*args, **kwargs) if result: result = result.decode('utf8') return result else: raise getattr(self, command) async def get(self, name, serializer=None, **kwargs) -> Any: """覆蓋redis的字串get方法,提供序列化能力 :param name: key :param serializer: 使用指定的序列化模組 :param kwargs: 傳遞給序列化方法 :return: 返回redis結果的反序列化物件 """ if not serializer: serializer = self.serializer value = await self.current_db.get(name) if not value: return None else: if isinstance(value, bytes): value = value.decode('utf8') try: return serializer.loads(value, **kwargs) except ValueError: return value async def set(self, name: str, value: Any, serializer=None, ex=None, px=None, nx=False, xx=False, **kwargs) -> bool: """永遠在redis層以string格式儲存,提供反序列化能力 :param name: :param value: :param serializer: 使用指定的序列化模組 :param ex: 設定鍵key的過期時間,單位為秒 :param px: 設定鍵key的過期時間,單位為毫秒 :param nx: 只有鍵key不存在的時候才會設定key的值 :param xx: 只有鍵key存在的時候才會設定key的值 :param kwargs: 傳遞給反序列化方法 :return: 執行結果 """ if not serializer: serializer = self.serializer _kwargs = {'ensure_ascii': True} _kwargs.update(kwargs) if not isinstance(value, str): value = serializer.dumps(value, **_kwargs) return await self.current_db.set(name, value, ex, px, nx, xx) def handle(self, backed: str): # TODO 查詢快取庫 # TODO 執行handler獲取結果 # TODO 返回結果並存儲至快取 """提供給檢視方法的裝飾器 它快取檢視方法返回的結果""" def __getitem__(self, item) -> 'Cache': return self.select(item) def __getattr__(self, attr) -> Any: return getattr(self.current_db, attr)
1.初始化並繫結web應用
# 快取配置 預設庫key值為default不可更改 # serializer為使用的預設序列化模組,不能使用serializer作為快取資料庫的key值 # 在無顯示指定的情況下,會優先選擇ujson作為序列化模組 # 當指定engine=memory時,使用本地記憶體作為快取,本地記憶體快取只支援get、set方法存取值 config = { 'serializer': 'ujson', 'default': {'engine': 'memory'}, 'redis': {'host': 'localhost', 'port': 6379, 'db': 4}, } app = Sanic() # 例項化Cache物件並繫結 app.cache = Cache(config)
2.檢視層獲取快取物件
# 方式一,通過app獲取(這裡想吐槽一下,sanic似乎沒有提供一個方法來獲取全域性app物件?)
async def hander(request):
cache = request.app.cache
# 方式二,由於Cache使用了單例模式,可以通過import匯入Cache並例項化
from cache import Cache
cache = Cache()
3.使用快取
# 1.存取資料 # get和set的儲存過程式列化資料(預設優先ujson)後存入對應快取元件,取出過程則是逆過程(如果使用redis,會自動處理utf8解碼) # get和set也支援傳入一個serializer引數,用於自定義序列化和反序列化器,詳見原始碼 cache = request.app.cache data = {'name': '上海'} # 將data存入快取,命名為mapData,有效期30秒 await cache.set("mapData", data, ex=30) # 獲取快取中key=mapData的值 val = await cache.get("mapData") # 2.多快取庫 # 獲取all屬性所有快取庫 print(cache.all) # 使用select方法指定快取庫 await cache.select('redis').get('mapData') # select支援鏈式呼叫 await cache.select('redis').select('default').set('mapData', data) # 獲取選擇快取庫的儲存引擎 engine: StrictRedis = cache.select('redis').current_db # 執行儲存引擎所支援的命令 hash = await engine.hget('hashmap') # 3.由於Cache通過反射機制,可以直接呼叫後端儲存引擎支援的方法,如redis作為後端引擎時 await cache.select('redis').sismember('name', 'value') # 4.除了redis,簡單實現了一個無序依賴第三方元件的記憶體快取引擎,其實就是通過一個字典來儲存資料,參考DataBlock原始碼 # 這個快取引擎實現了get、set、ttl三個公共介面,詳見MemoryEngine原始碼