zookeeper實現配置中心
阿新 • • 發佈:2019-01-22
最近做新系統,搞了個配置中心。本地用ehcache,配置最終儲存在zookeeper上。系統啟動連線zk,檢查指定節點目錄是否存在。若不存在,則建立。隨後設定監聽。節點內容變化則觸發監聽程式,更新資料到本地快取。讀資料時本地快取不命中,則讀取zk對應路徑節點內容到本地。
相關程式碼示例如下:
1.連線zk邏輯
private static ZooKeeper getInstance(){ if (zooKeeper == null) { try { if (INSTANCE_INIT_LOCK.tryLock(2, TimeUnit.SECONDS)) { try { zooKeeper = new ZooKeeper(Environment.ZK_ADDRESS, 20000, null); // init cfg root path ConfZkClient.createWithParent(Environment.CONF_DATA_PATH); zooKeeper.register(new Watcher() { @Override public voidprocess(WatchedEvent watchedEvent) { try { logger.info("confcenter: watcher:{}", watchedEvent); // session expire, close old and create new if (watchedEvent.getState() == Event.KeeperState.Expired) { zooKeeper.close(); zooKeeper = null; getInstance(); } String path = watchedEvent.getPath(); String key = pathToKey(path); if (key != null) { // add One-time trigger zooKeeper.exists(path, true); if (watchedEvent.getType() == Event.EventType.NodeDeleted) { ConfClient.remove(key); } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { String data = getPathDataByKey(key); ConfClient.update(key, data); } } } catch (KeeperException e) { logger.error("confcenter KeeperException:"+ e.getMessage(), e); } catch (InterruptedException e) { logger.error("confcenter InterruptedException:" +e.getMessage(), e); } } }); } finally { INSTANCE_INIT_LOCK.unlock(); } } } catch (InterruptedException e) { logger.error("confcenter init InterruptedException!" + e.getMessage(), e); } catch (IOException e) { logger.error("confcenter init IOException!" + e.getMessage(), e); } } if (zooKeeper == null) { throw new NullPointerException("confcenter ConfZkClient.zooKeeper is null."); } return zooKeeper; }
2.重新初始化置載入邏輯
public class ConfPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigurer { private static Logger log = LoggerFactory.getLogger(ConfPropertyPlaceholderConfigurer.class); private String beanName; private BeanFactory beanFactory; /** * @param beanFactoryToProcess * @param props * @throws BeansException */ @Override protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException { // 初始化StringValueResolver StringValueResolver valueResolver = new StringValueResolver() { String placeholderPrefix = "${"; String placeholderSuffix = "}"; @Override public String resolveStringValue(String strVal) { StringBuffer buf = new StringBuffer(strVal); // 如果value值匹配'${***}' boolean start = strVal.startsWith(placeholderPrefix); boolean end = strVal.endsWith(placeholderSuffix); while (start && end) { // 替換value值,${***} -> ***,例如${default.key01} -> default.key01 String key = buf.substring(placeholderPrefix.length(), buf.length() - placeholderSuffix.length()); // 獲取value,從properties、cache、zookeeper String zkValue = ConfClient.get(key, ""); buf = new StringBuffer(zkValue); start = buf.toString().startsWith(placeholderPrefix); end = buf.toString().endsWith(placeholderSuffix); } return buf.toString(); } }; // init bean define visitor BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver); // 獲取所有的bean定義,替換佔位符 String[] beanNames = beanFactoryToProcess.getBeanDefinitionNames(); if (beanNames != null && beanNames.length > 0) { for (String beanName : beanNames) { if (!(beanName.equals(this.beanName) && beanFactoryToProcess.equals(this.beanFactory))) { BeanDefinition bd = beanFactoryToProcess.getBeanDefinition(beanName); visitor.visitBeanDefinition(bd); } } } }
3.EhCache邏輯
public class ConfClient { private static Logger logger = LoggerFactory.getLogger(ConfClient.class); public static Properties localProp = PropertiesUtil.loadProperties("confcenter.properties"); private static Cache cache; static { // default use ehcche.xml under src CacheManager manager = CacheManager.create(); /** * name:快取名稱 * maxElementsInMemory:快取最大個數 * overflowToDisk:當記憶體中物件數量達到maxElementsInMemory時,ehcache將會物件寫到磁碟中。 * eternal:物件是否永久有效,一但設定了,timeout將不起作用。 * timeToLiveSeconds:設定物件在失效前允許存活時間(單位:秒)。最大時間介於建立時間和失效時間之間。僅當eternal=false物件不是永久有效時使用,預設是0.,也就是物件存活時間無窮大 * timeToIdleSeconds:設定物件在失效前的允許閒置時間(單位:秒)。僅當eternal=false物件不是永久有效時使用,可選屬性,預設值是0,也就是可閒置時間無窮大。 * */ cache = new Cache(Environment.CONF_DATA_PATH, 10000, false, true, 1800, 1800); manager.addCache(cache); } /** * 初始化配置 * * @param key * @param value */ public static void set(String key, String value) { if (cache != null) { logger.info("初始化配置: [{}:{}]", new Object[]{key, value}); cache.put(new Element(key, value)); } } /** * 更新配置 * * @param key * @param value */ public static void update(String key, String value) { if (cache != null) { if (cache.get(key)!=null) { logger.info("更新配置: [{}:{}]", new Object[]{key, value}); cache.put(new Element(key, value)); } } } /** * 獲取配置 * * @param key * @param defaultVal * @return */ public static String get(String key, String defaultVal) { if (localProp != null && localProp.containsKey(key)) { return localProp.getProperty(key); } if (cache != null) { Element element = cache.get(key); if (element != null) { return (String) element.getObjectValue(); } } String zkData = ConfZkClient.getPathDataByKey(key); if (zkData != null) { set(key, zkData); return zkData; } return defaultVal; } /** * 刪除配置 * * @param key * @return */ public static boolean remove(String key) { if (cache != null) { logger.info("刪除配置:key ", key); return cache.remove(key); } return false; } }