1. 程式人生 > 實用技巧 >Linux-04-賬號管理

Linux-04-賬號管理

簡單實現基於記憶體的快取引擎,並封裝第三方庫aredis實現redis快取

0.程式碼DEMO

"""
快取元件
"""
import time
import logging
import asyncio
from threading import Lock
from typing import Union, Any, Dict
from aredis import StrictRedis, StrictRedisCluster


class SingletonMeta(type):
    """元類——有限的單例模式
    當初始化引數包含new=True時,將構造一個新的物件
    """
    __instance = None
    __lock = Lock()

    def __call__(cls, *args, **kwargs):
        with cls.__lock:
            new = kwargs.pop('new', None)
            if new is True:
                return super().__call__(*args, **kwargs)
            if not cls.__instance:
                cls.__instance = super().__call__(*args, **kwargs)
        return cls.__instance


class MemoryEngine:
    """本地記憶體作為後端快取引擎,不支援分散式
    只支援get、set方法
    """

    def __init__(self):
        self.namespace = {}
        self._check_time = 0
        self._check_interval = 60

    def delete(self, key: str) -> None:
        """刪除指定快取"""
        if key in self.namespace:
            del self.namespace[key]

    def et_clear(self) -> None:
        """清理超時快取"""
        clear_names = []
        if time.time() > self._check_time + self._check_interval:
            for name, block in self.namespace.items():
                if block.ttl < -1:
                    clear_names.append(name)
        for name in clear_names:
            del self.namespace[name]

    async def ttl(self, name) -> int:
        self.et_clear()
        if name not in self.namespace:
            return -1
        return int(self.namespace[name].ttl)

    async def get(self, name):
        self.et_clear()
        if name not in self.namespace:
            return None
        return self.namespace[name].val

    async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
        if nx and name in self.namespace:
            return
        if xx and name not in self.namespace:
            return
        self.namespace[name] = DataBlock(name, value, ex, px)
        if len(value) > 16384 and (ex or px):
            # 實驗性功能 大容量快取清理機制 避免長時間不使用快取下佔用記憶體
            life = ex if ex else px // 1000
            loop = asyncio.get_event_loop()
            loop.call_later(life * 2, self.delete, name)


class DataBlock:
    """記憶體資料塊 封裝了有效期"""

    def __init__(self, name: str, value: Any, ex: float = None,
                 px: float = None):
        """
        :param name: key名
        :param value: 儲存value
        :param ex: 生命週期,單位秒
        :param px: 生命週期,單位毫秒
        """
        self._name = name
        self._value = value
        self.et = time.time() - 1
        if ex:
            self.et += ex
        if px:
            self.et += (px / 1000)
        if not ex and not px:
            self._ttl = -1

    @property
    def val(self):
        return self._value if self.ttl >= -1 else None

    @property
    def ttl(self):
        if hasattr(self, '_ttl'):
            return self._ttl
        return self.et - time.time()

    def __repr__(self):
        return f'<name={self._name}>'


class Cache(metaclass=SingletonMeta):
    """一個基於redis封裝的非同步快取類,它可以快速方便切換多個快取庫
    Cache類預設使用default快取庫,你可以使用select(db_name)切換其他庫,並且select支援
    鏈式呼叫,但select方法並不會改變原物件指向的default快取庫
    Cache物件通過反射擁有了StrictRedis和StrictRedisCluster類下的所有方法,你可以直接對
    物件執行redis命令,此外Cache還封裝了一個方法execute(command, *args, **kwargs)
    相比於反射方法,使用execute方法會自動對返回資料解碼
    針對字串型別,Cache對get和set方法作了優化,當使用get和set方法時,可以同時傳遞一個序列化器,
    它會查詢和儲存時自動使用序列化器,也就是說你可以使用set方法儲存任意序列化器支援的物件
    """
    logger = logging.getLogger(__name__)

    def __init__(self, config: dict):
        """
        :param config: 快取資料庫字典
        :return: Cache物件
        """
        self._default = 'default'
        self._caches = {}
        serializer = config.pop('serializer', 'ujson')
        try:
            self.serializer = __import__(serializer)
        except:
            self.serializer = __import__('json')
        for key, value in config.items():
            try:
                if value.get('engine') == 'memory':
                    self._caches[key] = MemoryEngine()
                elif 'startup_nodes' in value:
                    self._caches[key] = StrictRedisCluster(**value)
                else:
                    self._caches[key] = StrictRedis(**value)
            except Exception as e:
                self.logger.error(e)

    @property
    def all(self) -> Dict[str, Union[StrictRedis, StrictRedisCluster]]:
        """返回全部快取資料庫"""
        return self._caches

    @property
    def current_db(self) -> Union[StrictRedis, StrictRedisCluster]:
        """返回快取物件指向的快取資料庫"""
        return self._caches[self._default]

    def select(self, name: str = 'default') -> 'Cache':
        """獲取指定快取資料庫
        支援多次鏈式呼叫select方法
        永遠不會改變app所繫結的預設快取資料庫
        :param name: 定義的資料庫名,預設值為"default"
        :return: Cache物件
        """
        if name not in self._caches:
            raise AttributeError(f'Cache database "{name}" not found. '
                                 f'Please check CACHES config in settings')
        obj = Cache(config={}, new=True)
        obj._caches = self._caches
        obj._default = name
        return obj

    async def execute(self, command: str, *args, **kwargs) -> Any:
        """實現結果自解碼
        :param command: 執行的redis原生命令
        :return: 返回redis結果的utf8解碼
        """
        if hasattr(StrictRedis, command):
            result = await getattr(self, command)(*args, **kwargs)
            if result:
                result = result.decode('utf8')
            return result
        else:
            raise getattr(self, command)

    async def get(self, name, serializer=None, **kwargs) -> Any:
        """覆蓋redis的字串get方法,提供序列化能力
        :param name: key
        :param serializer: 使用指定的序列化模組
        :param kwargs: 傳遞給序列化方法
        :return: 返回redis結果的反序列化物件
        """
        if not serializer:
            serializer = self.serializer
        value = await self.current_db.get(name)
        if not value:
            return None
        else:
            if isinstance(value, bytes):
                value = value.decode('utf8')
        try:
            return serializer.loads(value, **kwargs)
        except ValueError:
            return value

    async def set(self, name: str, value: Any, serializer=None,
                  ex=None, px=None, nx=False, xx=False, **kwargs) -> bool:
        """永遠在redis層以string格式儲存,提供反序列化能力
        :param name:
        :param value:
        :param serializer: 使用指定的序列化模組
        :param ex: 設定鍵key的過期時間,單位為秒
        :param px: 設定鍵key的過期時間,單位為毫秒
        :param nx: 只有鍵key不存在的時候才會設定key的值
        :param xx: 只有鍵key存在的時候才會設定key的值
        :param kwargs: 傳遞給反序列化方法
        :return: 執行結果
        """
        if not serializer:
            serializer = self.serializer
        _kwargs = {'ensure_ascii': True}
        _kwargs.update(kwargs)
        if not isinstance(value, str):
            value = serializer.dumps(value, **_kwargs)
        return await self.current_db.set(name, value, ex, px, nx, xx)

    def handle(self, backed: str):
        # TODO 查詢快取庫

        # TODO 執行handler獲取結果

        # TODO 返回結果並存儲至快取
        """提供給檢視方法的裝飾器 它快取檢視方法返回的結果"""

    def __getitem__(self, item) -> 'Cache':
        return self.select(item)

    def __getattr__(self, attr) -> Any:
        return getattr(self.current_db, attr)

1.初始化並繫結web應用

# 快取配置 預設庫key值為default不可更改
# serializer為使用的預設序列化模組,不能使用serializer作為快取資料庫的key值
# 在無顯示指定的情況下,會優先選擇ujson作為序列化模組
# 當指定engine=memory時,使用本地記憶體作為快取,本地記憶體快取只支援get、set方法存取值
config = {
    'serializer': 'ujson',
    'default': {'engine': 'memory'},
    'redis': {'host': 'localhost', 'port': 6379, 'db': 4},
}
app = Sanic()
# 例項化Cache物件並繫結
app.cache = Cache(config)

2.檢視層獲取快取物件

# 方式一,通過app獲取(這裡想吐槽一下,sanic似乎沒有提供一個方法來獲取全域性app物件?)
async def hander(request):
    cache = request.app.cache

# 方式二,由於Cache使用了單例模式,可以通過import匯入Cache並例項化
from cache import Cache
cache = Cache()

3.使用快取

# 1.存取資料
# get和set的儲存過程式列化資料(預設優先ujson)後存入對應快取元件,取出過程則是逆過程(如果使用redis,會自動處理utf8解碼)
# get和set也支援傳入一個serializer引數,用於自定義序列化和反序列化器,詳見原始碼
cache = request.app.cache
data = {'name': '上海'}
# 將data存入快取,命名為mapData,有效期30秒
await cache.set("mapData", data, ex=30)
# 獲取快取中key=mapData的值
val = await cache.get("mapData")

# 2.多快取庫
# 獲取all屬性所有快取庫
print(cache.all)
# 使用select方法指定快取庫 
await cache.select('redis').get('mapData')
# select支援鏈式呼叫
await cache.select('redis').select('default').set('mapData', data)
# 獲取選擇快取庫的儲存引擎
engine: StrictRedis = cache.select('redis').current_db
# 執行儲存引擎所支援的命令
hash = await engine.hget('hashmap')

# 3.由於Cache通過反射機制,可以直接呼叫後端儲存引擎支援的方法,如redis作為後端引擎時
await cache.select('redis').sismember('name', 'value')

# 4.除了redis,簡單實現了一個無序依賴第三方元件的記憶體快取引擎,其實就是通過一個字典來儲存資料,參考DataBlock原始碼
# 這個快取引擎實現了get、set、ttl三個公共介面,詳見MemoryEngine原始碼