1. 程式人生 > >Guava+zookeeper的叢集本地快取小實踐

Guava+zookeeper的叢集本地快取小實踐

目錄

零、依賴

一、結構

二、邏輯

四、使用

五、問題

六、其他

零、依賴

<dependency>

    <groupId>com.google.guava</groupId>

    <artifactId>guava</artifactId>

    <version>20.0</version>

</dependency>

<dependency>

    <groupId>org.apache.curator</groupId>

    <artifactId>curator-recipes</artifactId>

    <version>4.0.1</version>

</dependency>

<dependency>

    <groupId>org.apache.zookeeper</groupId>

    <artifactId>zookeeper</artifactId>

    <version>3.4.12</version>

</dependency>

一、結構

二、邏輯

1.採用guava做本地快取服務

2.採用zookeeper做叢集快取通知

流程:

    ①修改、刪除資料

    ②讀取資料(未命中快取如下圖,命中則直接返回)

三、程式碼實現

/***************本地快取*******************/
public class LocalCacheUtil {

    private static Map<String, Cache> cacheMap = new ConcurrentHashMap<>();

    private static final String PATH_SPLIT_CHAR1 = "/";
    private static final String PATH_SPLIT_CHAR2 = "\\";

    /**
     * 獲取快取內容
     * @param group 分組
     * @param key key
     * @return 快取物件
     */
    public static Object get(String group,String key){
        check(group,"group");
        check(key,"key");
        Cache cache = cacheMap.get(group);
        if(cache == null){
            return null;
        }
        return cache.getIfPresent(key);
    }

    /**
     * 寫快取
     * @param group 分組
     * @param key key
     * @param val 值
     * @param expire 過期時間,不支援動態傳入
     */
    public static void put(String group,String key,Object val,Long expire){
        check(group,"group");
        check(key,"key");
        Cache cache = getCache(group,expire);
        cache.put(key,val);
        ZkClientUtil.setWatcher(group, key, new WatcherCallBack() {
            @Override
            public void call(String group, String key) {
                Cache cache = cacheMap.get(group);
                if(cache != null){
                    cache.invalidate(key);
                }
            }
        });
    }

    /**
     * 刪除快取
     * @param group 分組
     * @param key key
     */
    public static void remove(String group,String key){
        check(group,"group");
        check(key,"key");
        Cache cache = cacheMap.get(group);
        if(cache != null){
            cache.invalidate(key);
        }
        ZkClientUtil.delete(group,key);
    }

    /**
     * 獲取cache
     * @param group 分組
     * @param expire 超時時間
     * @return cache
     */
    private static Cache getCache(String group, Long expire){
        Cache cache = cacheMap.get(group);
        if(cache == null){
            cache = CacheBuilder.newBuilder()
                    .maximumSize(2000)
                    .expireAfterWrite(expire, TimeUnit.MINUTES)
                    .build();
            cacheMap.put(group,cache);
        }
        return cache;
    }

    private static void check(String src,String name){
        if(src == null || "".equals(src.trim())){
            throw new RuntimeException(name +" not empty!");
        }
        //判斷“/”,"\",因為zk是路徑
        if(src.contains(PATH_SPLIT_CHAR1) || src.contains(PATH_SPLIT_CHAR2)){
            throw new RuntimeException(name +" can not contains \"/\" and \"\\\"!");
        }
    }
}


/***************回撥*******************/
public interface WatcherCallBack {

    /**
     * watcher回撥處理
     * @param group 分組
     * @param key key
     */
    void call(String group, String key);
}

/***************zookeeper 處理類*******************/
public class ZkClientUtil {

    private static final Logger LOGGER = LoggerFactory.getLogger(com.epay.fcc.fses.common.cache.ZkClientUtil.class);
    /**
     * zk客戶端
     */
    private static CuratorFramework cf;

    private static String appPath;

    private static final String PATH_SPLIT_CHAR = "/";

    private static final String NAME_SPACE ="localcache";
    /**
     * zk初始化方法
     * 預設地址/config/cacheZkConfig.properties
     */
    public synchronized static void init(){
        //配置檔案地址
        String path = "/config/cacheZkConfig.properties";
        init(path);
    }

    /**
     * 載入配置檔案路徑
     * ClassLoader().getResourceAsStream(path)
     * @param path
     */
    public synchronized static void init(String path){
        //配置檔案地址
        CacheZkConfig zkConfig = loadConfig(path);
        init(zkConfig);
    }

    /**
     * zk初始化方法
     */
    public synchronized static void init(CacheZkConfig zkConfig){
        LOGGER.info("ZkClientUtil init start...");
        String path = "";
        cf = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getConnectString())
                .sessionTimeoutMs(zkConfig.getSessionTimeoutMs())
                .connectionTimeoutMs(zkConfig.getConnectionTimeoutMs())
                .retryPolicy(new ExponentialBackoffRetry(
                        zkConfig.getBaseSleepTimeMs(),
                        zkConfig.getMaxRetries()))
                .namespace(NAME_SPACE)
                .build();
        cf.start();
        appPath = zkConfig.getAppPath();
        LOGGER.info("ZkClientUtil init complete...");
    }

    /**
     * 設定watcher
     * @param group 分組
     * @param key key
     * @param watcherCallBack watcher回撥
     */
    public static void setWatcher(String group, String key, WatcherCallBack watcherCallBack)  {

        CuratorWatcher curatorWatcher = new CuratorWatcher() {
            @Override
            public void process(WatchedEvent watchedEvent) throws Exception {
                LOGGER.info("收到監聽事件:"+watchedEvent.toString());
                //空事件不處理,只做監控
                if(Watcher.Event.EventType.None.equals(watchedEvent.getType())){
                    return;
                }
                String path = watchedEvent.getPath();
                String[] pathSplit = splitPath(path);
                LOGGER.info("GROUP = "+pathSplit[1]+",KEY = "+pathSplit[2]);
                watcherCallBack.call(pathSplit[1],pathSplit[2]);
            }
        };
        String path = getPath(appPath,group,key);
        try {
            Stat stat = cf.checkExists().forPath(path);
            if(stat == null){
                cf.create().creatingParentsIfNeeded().forPath(path);
            }
            cf.checkExists().usingWatcher(curatorWatcher).forPath(path);
        } catch (Exception e) {
            LOGGER.error("set watcher fail, path:"+path+":"+e.getMessage());
        }
    }

    /**
     *  刪除節點
     * @param group 分組
     * @param key key
     */
    public static void delete(String group, String key){
        String path = getPath(appPath,group,key);
        try {
            Stat stat = cf.checkExists().forPath(path);
            if(stat != null){
                cf.delete().guaranteed().forPath(path);
            }

        } catch (Exception e) {
            LOGGER.error("delete zNode fail, path:"+path+":"+e.getMessage());
        }
    }

    /**
     * 生成znode路徑
     * @param strs 路徑list
     * @return 路徑
     */
    private static String getPath(String ... strs){
        StringBuffer stringBuffer = new StringBuffer();
        for (String str : strs) {
            stringBuffer.append(PATH_SPLIT_CHAR).append(str.trim());
        }
        return stringBuffer.toString();
    }

    /**
     * 切割路徑
     * @param path 路勁
     * @return 路徑list
     */
    private static String[] splitPath(String path){
        path = path.substring(1);
        return path.split(PATH_SPLIT_CHAR);
    }

    private static CacheZkConfig loadConfig(String path){
        LOGGER.info("ZkClientUtil start load properties start :"+path);
        Properties properties = new Properties();
        // 使用ClassLoader載入properties配置檔案生成對應的輸入流
        InputStream in = ZkClientUtil.class.getClassLoader().getResourceAsStream(path);
        // 使用properties物件載入輸入流
        try {
            properties.load(in);
            LOGGER.info("ZkClientUtil load properties finish :"+path);
        }catch (IOException e){
            throw new RuntimeException("ZkClientUtil load properties fail :"+path,e);
        }
        CacheZkConfig zkConfig = new CacheZkConfig();
        zkConfig.setConnectString(properties.getProperty("connectString"));
        zkConfig.setSessionTimeoutMs(properties.getProperty("sessionTimeoutMs"));
        zkConfig.setConnectionTimeoutMs(properties.getProperty("connectionTimeoutMs"));
        zkConfig.setBaseSleepTimeMs(properties.getProperty("baseSleepTimeMs"));
        zkConfig.setMaxRetries(properties.getProperty("maxRetries"));
        zkConfig.setAppPath(properties.getProperty("appPath"));
        return zkConfig;
    }

    static class CacheZkConfig{
        /**
         * 連線地址,多個地址用“,”間隔
         * 例如:localhost:2181,localhost1:2181
         */
        private String connectString;
        /**
         * session超時時間,單位ms
         */
        private int sessionTimeoutMs = 5000;
        /**
         * 連線超時時間,單位ms
         */
        private int connectionTimeoutMs = 5000;
        /**
         * 重試機制,間隔時間
         */
        private int baseSleepTimeMs = 100;
        /**
         * 重試機制,重試次數
         */
        private int maxRetries = 5;
        /**
         * 應用路徑,用於隔離znood路徑
         */
        private String appPath = "defaultAppPath";

        public String getConnectString() {
            return connectString;
        }

        public void setConnectString(String connectString) {
            if(connectString == null || "".equals(connectString.trim())){
                throw new RuntimeException("load properties fail,connectString must spacial!!!");
            }
            this.connectString = connectString;
        }

        public int getSessionTimeoutMs() {
            return sessionTimeoutMs;
        }

        public void setSessionTimeoutMs(String sessionTimeoutMs) {
            if(sessionTimeoutMs != null && !"".equals(sessionTimeoutMs.trim())){
                this.sessionTimeoutMs = Integer.valueOf(sessionTimeoutMs);
            }
        }

        public int getConnectionTimeoutMs() {
            return connectionTimeoutMs;
        }

        public void setConnectionTimeoutMs(String connectionTimeoutMs) {
            if(connectionTimeoutMs != null && !"".equals(connectionTimeoutMs.trim())){
                this.connectionTimeoutMs = Integer.valueOf(connectionTimeoutMs);
            }
        }

        public int getBaseSleepTimeMs() {
            return baseSleepTimeMs;
        }

        public void setBaseSleepTimeMs(String baseSleepTimeMs) {
            if(baseSleepTimeMs != null && !"".equals(baseSleepTimeMs.trim())){
                this.baseSleepTimeMs = Integer.valueOf(baseSleepTimeMs);
            }
        }

        public int getMaxRetries() {
            return maxRetries;
        }

        public void setMaxRetries(String maxRetries) {
            if(maxRetries != null && !"".equals(maxRetries.trim())){
                this.maxRetries = Integer.valueOf(maxRetries);
            }
        }

        public String getAppPath() {
            return appPath;
        }

        public void setAppPath(String appPath) {
            if(appPath != null && !"".equals(appPath.trim())){
                this.appPath = appPath;
            }
        }
    }
}

四、使用

public class LocalCacheTest {
    //模擬資料庫
    private static final Map<String,Map<String,String>> db = new ConcurrentHashMap<>();
    //模擬資料庫資料
    static{
        Map<String,String> map = new HashMap();
        map.put("uId","1001");
        map.put("name","name_"+"1001");
        db.put("1001",map);
    }


    public static void main(String[] args) throws InterruptedException {
        //設定配置
        ZkClientUtil.CacheZkConfig cacheZkConfig = new ZkClientUtil.CacheZkConfig();
        cacheZkConfig.setConnectString("localhost:2181");
        //初始化zk
        ZkClientUtil.init(cacheZkConfig);

        //1.測試讀取快取
        for (int i = 0 ;i<=1 ;i++) {
            System.out.println(findUser("1001"));
        }
        //2.測試快取失效
//        Thread.sleep(60000);
        System.out.println("--------------");
        System.out.println("過期後查詢:"+findUser("1001"));
        //3.測試快取移除
        updateUser("1001","name_2");
        System.out.println("更新後查詢:"+findUser("1001"));
        //4.測試快取移除
        deleteUser("1001");
        System.out.println("刪除後查詢:"+findUser("1001"));
    }

    /**
     * 讀取快取測試
     * @param uId
     * @return
     */
    private static Map<String,String> findUser(String uId){
        String group = "user";
        Object o = LocalCacheUtil.get(group,uId);
        if(o != null){
            System.out.println("------->快取命中,key:"+uId);
            return (Map<String,String>) o;
        }
        System.out.println("------->快取未命中,key:"+uId);
        Map<String,String> user = db.get(uId);
        if(user != null){
            System.out.println("------->放快取,key:"+uId);
            LocalCacheUtil.put(group,uId,user,1L);
        }
        return user;
    }

    private static void updateUser(String uId,String name){
        String group = "user";
        Map<String,String> user = findUser(uId);
        if(user == null){
            throw new RuntimeException("user not exist,uId:"+uId);
        }
        System.out.println("------->刪除快取,key:"+uId);
        LocalCacheUtil.remove(group,uId);
        user.put("name",name);
        db.put(uId,user);
        //如果擔心此期間其他請求重新整理快取,可以在db修改後再remove一次快取(快取雙淘汰)
    }

    private static void deleteUser(String uId){
        String group = "user";
        Map<String,String> user = findUser(uId);
        if(user == null){
            //不存在直接認為成功
            return;
        }
        System.out.println("------->刪除快取,key:"+uId);
        LocalCacheUtil.remove(group,uId);
        db.remove(uId);
        //如果擔心此期間其他請求重新整理快取,可以在db刪除後再remove一次快取(快取雙淘汰)
    }

}

五、問題

1.叢集同步機制依賴於zookeeper,zk的建立連線是非同步的,連不上也能啟動,會影響叢集通知

2.基於aop註解的沒寫

六、其他

一次簡單的嘗試,歡迎討(來)論(噴),郵箱地址[email protected]