Python基於單例模式實現具有時效性的記憶體快取
阿新 • • 發佈:2018-12-22
Python基於單例模式實現具有時效性的記憶體快取
版本說明:Python 2.7
Python有不少第三方的快取庫,如cacheout、memcached等。因為專案需求,這裡不使用第三方庫,自己實現具有時效性的記憶體快取,用來快取重複利用的資料。
1 設計實現
1.1 思路
採用dict()作為快取介質,資料以key、value的形式進行儲存。key為cache_id,用來標識不同的快取資料。value是要進行快取的資料。並且使用單例的設計模式,保障快取資料的原子性。在時效性控制上,對每一個快取資料進行單獨控制,使用threading.Timer進行延時銷燬快取資料。
1.2 設計單例模式
本例子使用__new__
關鍵字實現單例模式。如下所示:
# encoding: utf-8
"""
Created by shirukai on 2018/11/7
"""
import threading
class DataCache(object):
# 新增執行緒鎖,保證多執行緒安全
__instance_lock = threading.Lock()
def __init__(self):
pass
def __new__(cls, *args, **kwargs):
if not hasattr(DataCache, "_instance"):
with DataCache.__instance_lock:
if not hasattr(DataCache, "_instance"):
DataCache._instance = object.__new__(cls)
return DataCache._instance
測試:
if __name__ == '__main__':
for i in xrange(10):
print DataCache()
1.3 時效性控制
在時效性控制上,選用了threading.Timer進行延時執行方法。例如我要延時執行一個列印方法,可以使用如下的程式碼.
# encoding: utf-8
"""
Created by shirukai on 2018/11/7
"""
import threading
import time
def print_str(string):
print string
if __name__ == '__main__':
timer = threading.Timer(2, print_str, ['test timer'])
timer.start()
time.sleep(10)
Timer(延時時間,執行函式,引數)
2 實現程式碼
# encoding: utf-8
"""
Created by shirukai on 2018/11/5
基於單例模式實現具有時效性的記憶體快取
"""
import threading
import uuid
class DataCache(object):
"""
_cache 的資料結構如下所示:
_cache:{
"cache_id_1":{
"value":"cache_value",
"expired":"60s",
"timer":"定時清理器例項",
}
}
"""
# 預設 expired = 2*60*60s
EXPIRED = 2 * 60 * 60
# 新增執行緒鎖,保證多執行緒安全
__instance_lock = threading.Lock()
# 初始化dict()用來快取資料
__CACHE = dict()
def __init__(self):
self.__cache = DataCache.__CACHE
def __new__(cls, *args, **kwargs):
if not hasattr(DataCache, "_instance"):
with DataCache.__instance_lock:
if not hasattr(DataCache, "_instance"):
DataCache._instance = object.__new__(cls)
return DataCache._instance
def set(self, value, cache_id=None, expired=EXPIRED):
"""
儲存快取
:param value: 快取資料
:param cache_id: cache_id 預設值:None
:param expired: 過期時間 預設值:EXPIRED
:return: cache_id
"""
if cache_id is None or cache_id == "":
cache_id = str(uuid.uuid4())
self._set_cache(value, cache_id, expired)
return cache_id
def delete(self, cache_id):
"""
刪除快取
:param cache_id: 快取ID
:return: None
"""
self._clean_cache(cache_id)
def get(self, cache_id):
"""
獲取快取
:param cache_id:快取ID
:return:
"""
if self.__cache.has_key(cache_id):
return self.__cache[cache_id]['value']
else:
return None
def values(self):
"""
獲取所有值
:return: {
“cache_id_1”:"value1",
“cache_id_2”:"value2"
}
"""
return {key: item['value'] for key, item in self.__cache.items()}
def clear(self):
"""
清空快取
:return: None
"""
for cache_id in self.__cache.keys():
self._clean_cache(cache_id)
def _set_cache(self, value, cache_id, expired):
# 刪除快取
self._clean_cache(cache_id)
# 設定時效監控執行緒
timer = threading.Timer(expired, self._clean_cache, [cache_id])
timer.start()
self.__cache[cache_id] = {
'timer': timer,
'value': value
}
def _clean_cache(self, cache_id):
if self.__cache.has_key(cache_id):
timer = self.__cache[cache_id]['timer']
timer.cancel()
self.__cache.pop(cache_id)
3 測試
測試程式碼:
# encoding: utf-8
"""
Created by shirukai on 2018/11/7
"""
from src.api.v1.cache.data_cache import DataCache
import time
if __name__ == '__main__':
cache = DataCache()
# 儲存一個字串,並設定時效性為6秒
cache_id = cache.set(value="test cache!", expired=6)
# 每隔一秒列印一次資料
for i in xrange(10):
print cache.get(cache_id)
time.sleep(1)
效果: