基於zookeeper的分散式配置中心-熱更新配置
阿新 • • 發佈:2021-10-14
//zookeeper,工具類 public class ZkUUtils { private static ZooKeeper zK; private static DefaultWatch defaultWatch = new DefaultWatch(); private static CountDownLatch cc = new CountDownLatch(1); public static ZooKeeper getZk(){ try { zK = new ZooKeeper("1277.0.0.1:2181/testConfig2",3000,defaultWatch); defaultWatch.setCountDownLatch(cc); cc.await(); }catch (Exception e) { e.printStackTrace(); } return zK; } }
//預設監聽 @Data public class DefaultWatch implements Watcher { CountDownLatch countDownLatch; @Override public void process(WatchedEvent event) { switch (event.getState()) { caseUnknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: countDownLatch.countDown(); break; case AuthFailed: break;case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } } }
//測試config @Data public class PConfig { private String config; }
//基於zk回撥的配置熱更 @Data public class PConfigWatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private PConfig pConfig; private ZooKeeper zk; private CountDownLatch cc; @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: zk.getData("/PTestConfig", this,this,"CCC"); break; case NodeDeleted: pConfig.setConfig(""); cc = new CountDownLatch(1); break; case NodeDataChanged: zk.getData("/PTestConfig", this,this,"CCC"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; case PersistentWatchRemoved: break; } } //StatCallback @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if(Objects.nonNull(stat)){ zk.getData("/PTestConfig", this,this,"CCC"); } } @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { String cfg = new String(data); pConfig.setConfig(cfg); cc.countDown(); } public void await(){ zk.exists("/PTestConfig",this,this,"AAA"); try { cc.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
//測試類 public class PTestConfigMain { private ZooKeeper zk; @Before public void getZk(){ zk = ZkUUtils.getZk(); } @After public void close(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void testConfig() throws InterruptedException { PConfigWatchCallBack pConfigWatchCallBack = new PConfigWatchCallBack(); PConfig config = new PConfig(); pConfigWatchCallBack.setPConfig(config); pConfigWatchCallBack.setZk(zk); CountDownLatch countDownLatch = new CountDownLatch(1); pConfigWatchCallBack.setCc(countDownLatch); pConfigWatchCallBack.await(); while(true){ if(config.getConfig().equals("")){ System.out.println("配置被刪了"); pConfigWatchCallBack.await(); }else{ System.out.println(config.getConfig()); } TimeUnit.SECONDS.sleep(2); } } }
在zookeeper中建立根節點,/testConfig2
在testConfig2下設定配置,PTestConfig,程式碼回去讀取節點PTestConfig的data,將資料讀到config類中,當節點被刪除後,測試類中配置提取會阻塞,節點建立或者修改時,會更新
讀取到相應的內容