zookeeper 永久監聽節點 來保證叢集間一致性
阿新 • • 發佈:2019-01-23
先是封裝的 zkClient
public class ZkClient { public Logger logger = LoggerFactory.getLogger(getClass()); public ZooKeeper zookeeper; private static int SESSION_TIME_OUT = 2000; private IWatchEvent watchEvent; private static CountDownLatch countDownLatch = new CountDownLatch(1); private SessionWatcher sessionWatcher = new SessionWatcher(); public ZkClient(){ } public ZkClient(IWatchEvent we){ this.watchEvent = we; } /** * 連線zookeeper * @param host * @throws IOException * @throws InterruptedException */ public void connectZookeeper(String host){ logger.info(Log.op("連線ZooKeeper開始").kv("host:", host).toString()); try { zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, sessionWatcher); //連線超時4S countDownLatch.await(4000, TimeUnit.MILLISECONDS); } catch (Exception e) { logger.error(Log.op("連線ZooKeeper異常").kv("host:", host).toString(), e); } logger.info(Log.op("連線ZooKeeper完成").kv("host:", host).toString()); } class SessionWatcher implements Watcher { public void process(WatchedEvent watchedEvent) { logger.info(Log.op("zookeeper事件") .kv("state:", watchedEvent.getState()).kv("type:", watchedEvent.getType()).toString()); if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } if (watchEvent != null){ watchEvent.watchAfter(watchedEvent); } } } /** * 根據路徑建立節點,並且設定節點資料 * @param path * @param data * @return * @throws KeeperException * @throws InterruptedException */ public String createNode(String path,byte[] data){ logger.info(Log.op("建立ZooKeeper節點").kv("nodePath:", path).kv("data:", data).toString()); String s = null; try { s = this.zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (Exception e) { logger.error(Log.op("建立ZooKeeper節點異常").kv("nodePath:", path).kv("data:", data).toString(), e); } return s; } /** * 節點是否存在 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public boolean exists(String path) { boolean bl = false; try { if (this.zookeeper.exists(path, true) != null) { bl = true; } } catch (Exception e) { logger.error(Log.op("判斷zookeeper節點是否存在異常").toString(), e); } return bl; } /** * 為節點設定監聽 * @param path * @throws KeeperException * @throws InterruptedException */ public void existsWatch(String path) { try { this.zookeeper.exists(path, sessionWatcher); } catch (Exception e) { logger.error(Log.op("設定ZooKeeper節點監聽異常").kv("nodePath:", path).toString(), e); } } /** * 關閉zookeeper連線 * @throws InterruptedException */ public void closeConnect() { if(null != zookeeper){ try { zookeeper.close(); } catch (InterruptedException e) { logger.error(Log.op("關閉zookeeper連線異常").toString(), e); } } } }
只是簡單是寫了些自己要用到的操作,
介面 IWatchEvent 用來在watch事件執行個性化操作
public interface IWatchEvent {
void watchAfter(WatchedEvent watchedEvent);
}
最後是關鍵的ZkConfig類watch 是一次性監聽,所以每次都註冊新的監聽@Configuration public class ZkConfig { @Value("${zookeeper.host}") private String host; @Value("${zookeeper.nodePath.rule}") private String nodePath; public Logger logger = LoggerFactory.getLogger(getClass()); private ZkClient client = null; @Resource DroolsHelperService droolsHelperService; class CusWatcherEvent implements IWatchEvent{ @Override public void watchAfter(WatchedEvent watchedEvent) { try { if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged){ logger.info(Log.op("ZooKeeper節點資料變化").kv("nodePath:", nodePath).toString()); droolsHelperService.loadRule(); } //失效重連 if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) { client.closeConnect(); client.connectZookeeper(host); } } catch (Exception e) { logger.error(Log.op("載入規則異常").kv("nodePath:", nodePath).toString(), e); }finally { logger.info(Log.op("重新監聽ZooKeeper節點").kv("nodePath:", nodePath).toString()); client.existsWatch(nodePath); } } } /** * 新增zk節點監聽 */ @PostConstruct private void init(){ client = new ZkClient(new CusWatcherEvent()); client.connectZookeeper(host); byte[] bytes = new byte[]{1}; if (!client.exists(nodePath)) { client.createNode(nodePath, bytes); }else{ logger.info(Log.op("ZooKeeper節點已存在").kv("nodePath:", nodePath).toString()); } } }
對zookeeper並不是很瞭解
有個問題
過了一段時間(一個小時以上,具體多久不清楚) 會發生 Watcher.Event.KeeperState.Expired 事件,然後就失去對zk節點的監聽,所以我在程式碼裡檢測這個事件,然後重連zk。
寫的應該是有問題的,,希望有大俠能解答我的疑問,, :)