Guava+zookeeper的叢集本地快取小實踐
阿新 • • 發佈:2018-12-12
目錄
零、依賴
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
一、結構
二、邏輯
1.採用guava做本地快取服務
2.採用zookeeper做叢集快取通知
流程:
①修改、刪除資料
②讀取資料(未命中快取如下圖,命中則直接返回)
三、程式碼實現
/***************本地快取*******************/ public class LocalCacheUtil { private static Map<String, Cache> cacheMap = new ConcurrentHashMap<>(); private static final String PATH_SPLIT_CHAR1 = "/"; private static final String PATH_SPLIT_CHAR2 = "\\"; /** * 獲取快取內容 * @param group 分組 * @param key key * @return 快取物件 */ public static Object get(String group,String key){ check(group,"group"); check(key,"key"); Cache cache = cacheMap.get(group); if(cache == null){ return null; } return cache.getIfPresent(key); } /** * 寫快取 * @param group 分組 * @param key key * @param val 值 * @param expire 過期時間,不支援動態傳入 */ public static void put(String group,String key,Object val,Long expire){ check(group,"group"); check(key,"key"); Cache cache = getCache(group,expire); cache.put(key,val); ZkClientUtil.setWatcher(group, key, new WatcherCallBack() { @Override public void call(String group, String key) { Cache cache = cacheMap.get(group); if(cache != null){ cache.invalidate(key); } } }); } /** * 刪除快取 * @param group 分組 * @param key key */ public static void remove(String group,String key){ check(group,"group"); check(key,"key"); Cache cache = cacheMap.get(group); if(cache != null){ cache.invalidate(key); } ZkClientUtil.delete(group,key); } /** * 獲取cache * @param group 分組 * @param expire 超時時間 * @return cache */ private static Cache getCache(String group, Long expire){ Cache cache = cacheMap.get(group); if(cache == null){ cache = CacheBuilder.newBuilder() .maximumSize(2000) .expireAfterWrite(expire, TimeUnit.MINUTES) .build(); cacheMap.put(group,cache); } return cache; } private static void check(String src,String name){ if(src == null || "".equals(src.trim())){ throw new RuntimeException(name +" not empty!"); } //判斷“/”,"\",因為zk是路徑 if(src.contains(PATH_SPLIT_CHAR1) || src.contains(PATH_SPLIT_CHAR2)){ throw new RuntimeException(name +" can not contains \"/\" and \"\\\"!"); } } } /***************回撥*******************/ public interface WatcherCallBack { /** * watcher回撥處理 * @param group 分組 * @param key key */ void call(String group, String key); } /***************zookeeper 處理類*******************/ public class ZkClientUtil { private static final Logger LOGGER = LoggerFactory.getLogger(com.epay.fcc.fses.common.cache.ZkClientUtil.class); /** * zk客戶端 */ private static CuratorFramework cf; private static String appPath; private static final String PATH_SPLIT_CHAR = "/"; private static final String NAME_SPACE ="localcache"; /** * zk初始化方法 * 預設地址/config/cacheZkConfig.properties */ public synchronized static void init(){ //配置檔案地址 String path = "/config/cacheZkConfig.properties"; init(path); } /** * 載入配置檔案路徑 * ClassLoader().getResourceAsStream(path) * @param path */ public synchronized static void init(String path){ //配置檔案地址 CacheZkConfig zkConfig = loadConfig(path); init(zkConfig); } /** * zk初始化方法 */ public synchronized static void init(CacheZkConfig zkConfig){ LOGGER.info("ZkClientUtil init start..."); String path = ""; cf = CuratorFrameworkFactory.builder() .connectString(zkConfig.getConnectString()) .sessionTimeoutMs(zkConfig.getSessionTimeoutMs()) .connectionTimeoutMs(zkConfig.getConnectionTimeoutMs()) .retryPolicy(new ExponentialBackoffRetry( zkConfig.getBaseSleepTimeMs(), zkConfig.getMaxRetries())) .namespace(NAME_SPACE) .build(); cf.start(); appPath = zkConfig.getAppPath(); LOGGER.info("ZkClientUtil init complete..."); } /** * 設定watcher * @param group 分組 * @param key key * @param watcherCallBack watcher回撥 */ public static void setWatcher(String group, String key, WatcherCallBack watcherCallBack) { CuratorWatcher curatorWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { LOGGER.info("收到監聽事件:"+watchedEvent.toString()); //空事件不處理,只做監控 if(Watcher.Event.EventType.None.equals(watchedEvent.getType())){ return; } String path = watchedEvent.getPath(); String[] pathSplit = splitPath(path); LOGGER.info("GROUP = "+pathSplit[1]+",KEY = "+pathSplit[2]); watcherCallBack.call(pathSplit[1],pathSplit[2]); } }; String path = getPath(appPath,group,key); try { Stat stat = cf.checkExists().forPath(path); if(stat == null){ cf.create().creatingParentsIfNeeded().forPath(path); } cf.checkExists().usingWatcher(curatorWatcher).forPath(path); } catch (Exception e) { LOGGER.error("set watcher fail, path:"+path+":"+e.getMessage()); } } /** * 刪除節點 * @param group 分組 * @param key key */ public static void delete(String group, String key){ String path = getPath(appPath,group,key); try { Stat stat = cf.checkExists().forPath(path); if(stat != null){ cf.delete().guaranteed().forPath(path); } } catch (Exception e) { LOGGER.error("delete zNode fail, path:"+path+":"+e.getMessage()); } } /** * 生成znode路徑 * @param strs 路徑list * @return 路徑 */ private static String getPath(String ... strs){ StringBuffer stringBuffer = new StringBuffer(); for (String str : strs) { stringBuffer.append(PATH_SPLIT_CHAR).append(str.trim()); } return stringBuffer.toString(); } /** * 切割路徑 * @param path 路勁 * @return 路徑list */ private static String[] splitPath(String path){ path = path.substring(1); return path.split(PATH_SPLIT_CHAR); } private static CacheZkConfig loadConfig(String path){ LOGGER.info("ZkClientUtil start load properties start :"+path); Properties properties = new Properties(); // 使用ClassLoader載入properties配置檔案生成對應的輸入流 InputStream in = ZkClientUtil.class.getClassLoader().getResourceAsStream(path); // 使用properties物件載入輸入流 try { properties.load(in); LOGGER.info("ZkClientUtil load properties finish :"+path); }catch (IOException e){ throw new RuntimeException("ZkClientUtil load properties fail :"+path,e); } CacheZkConfig zkConfig = new CacheZkConfig(); zkConfig.setConnectString(properties.getProperty("connectString")); zkConfig.setSessionTimeoutMs(properties.getProperty("sessionTimeoutMs")); zkConfig.setConnectionTimeoutMs(properties.getProperty("connectionTimeoutMs")); zkConfig.setBaseSleepTimeMs(properties.getProperty("baseSleepTimeMs")); zkConfig.setMaxRetries(properties.getProperty("maxRetries")); zkConfig.setAppPath(properties.getProperty("appPath")); return zkConfig; } static class CacheZkConfig{ /** * 連線地址,多個地址用“,”間隔 * 例如:localhost:2181,localhost1:2181 */ private String connectString; /** * session超時時間,單位ms */ private int sessionTimeoutMs = 5000; /** * 連線超時時間,單位ms */ private int connectionTimeoutMs = 5000; /** * 重試機制,間隔時間 */ private int baseSleepTimeMs = 100; /** * 重試機制,重試次數 */ private int maxRetries = 5; /** * 應用路徑,用於隔離znood路徑 */ private String appPath = "defaultAppPath"; public String getConnectString() { return connectString; } public void setConnectString(String connectString) { if(connectString == null || "".equals(connectString.trim())){ throw new RuntimeException("load properties fail,connectString must spacial!!!"); } this.connectString = connectString; } public int getSessionTimeoutMs() { return sessionTimeoutMs; } public void setSessionTimeoutMs(String sessionTimeoutMs) { if(sessionTimeoutMs != null && !"".equals(sessionTimeoutMs.trim())){ this.sessionTimeoutMs = Integer.valueOf(sessionTimeoutMs); } } public int getConnectionTimeoutMs() { return connectionTimeoutMs; } public void setConnectionTimeoutMs(String connectionTimeoutMs) { if(connectionTimeoutMs != null && !"".equals(connectionTimeoutMs.trim())){ this.connectionTimeoutMs = Integer.valueOf(connectionTimeoutMs); } } public int getBaseSleepTimeMs() { return baseSleepTimeMs; } public void setBaseSleepTimeMs(String baseSleepTimeMs) { if(baseSleepTimeMs != null && !"".equals(baseSleepTimeMs.trim())){ this.baseSleepTimeMs = Integer.valueOf(baseSleepTimeMs); } } public int getMaxRetries() { return maxRetries; } public void setMaxRetries(String maxRetries) { if(maxRetries != null && !"".equals(maxRetries.trim())){ this.maxRetries = Integer.valueOf(maxRetries); } } public String getAppPath() { return appPath; } public void setAppPath(String appPath) { if(appPath != null && !"".equals(appPath.trim())){ this.appPath = appPath; } } } }
四、使用
public class LocalCacheTest { //模擬資料庫 private static final Map<String,Map<String,String>> db = new ConcurrentHashMap<>(); //模擬資料庫資料 static{ Map<String,String> map = new HashMap(); map.put("uId","1001"); map.put("name","name_"+"1001"); db.put("1001",map); } public static void main(String[] args) throws InterruptedException { //設定配置 ZkClientUtil.CacheZkConfig cacheZkConfig = new ZkClientUtil.CacheZkConfig(); cacheZkConfig.setConnectString("localhost:2181"); //初始化zk ZkClientUtil.init(cacheZkConfig); //1.測試讀取快取 for (int i = 0 ;i<=1 ;i++) { System.out.println(findUser("1001")); } //2.測試快取失效 // Thread.sleep(60000); System.out.println("--------------"); System.out.println("過期後查詢:"+findUser("1001")); //3.測試快取移除 updateUser("1001","name_2"); System.out.println("更新後查詢:"+findUser("1001")); //4.測試快取移除 deleteUser("1001"); System.out.println("刪除後查詢:"+findUser("1001")); } /** * 讀取快取測試 * @param uId * @return */ private static Map<String,String> findUser(String uId){ String group = "user"; Object o = LocalCacheUtil.get(group,uId); if(o != null){ System.out.println("------->快取命中,key:"+uId); return (Map<String,String>) o; } System.out.println("------->快取未命中,key:"+uId); Map<String,String> user = db.get(uId); if(user != null){ System.out.println("------->放快取,key:"+uId); LocalCacheUtil.put(group,uId,user,1L); } return user; } private static void updateUser(String uId,String name){ String group = "user"; Map<String,String> user = findUser(uId); if(user == null){ throw new RuntimeException("user not exist,uId:"+uId); } System.out.println("------->刪除快取,key:"+uId); LocalCacheUtil.remove(group,uId); user.put("name",name); db.put(uId,user); //如果擔心此期間其他請求重新整理快取,可以在db修改後再remove一次快取(快取雙淘汰) } private static void deleteUser(String uId){ String group = "user"; Map<String,String> user = findUser(uId); if(user == null){ //不存在直接認為成功 return; } System.out.println("------->刪除快取,key:"+uId); LocalCacheUtil.remove(group,uId); db.remove(uId); //如果擔心此期間其他請求重新整理快取,可以在db刪除後再remove一次快取(快取雙淘汰) } }
五、問題
1.叢集同步機制依賴於zookeeper,zk的建立連線是非同步的,連不上也能啟動,會影響叢集通知
2.基於aop註解的沒寫
六、其他
一次簡單的嘗試,歡迎討(來)論(噴),郵箱地址[email protected]