1. 程式人生 > >zookeeper 永久監聽節點 來保證叢集間一致性

zookeeper 永久監聽節點 來保證叢集間一致性

先是封裝的 zkClient

public class ZkClient {

    public Logger logger = LoggerFactory.getLogger(getClass());
    public ZooKeeper zookeeper;
    private static int SESSION_TIME_OUT = 2000;
    private IWatchEvent watchEvent;
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private SessionWatcher sessionWatcher = new SessionWatcher();

    public ZkClient(){

    }

    public ZkClient(IWatchEvent we){
        this.watchEvent = we;
    }


    /**
     * 連線zookeeper
     * @param host
     * @throws IOException
     * @throws InterruptedException
     */
    public void connectZookeeper(String host){
        logger.info(Log.op("連線ZooKeeper開始").kv("host:", host).toString());
        try {
            zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, sessionWatcher);
            //連線超時4S
            countDownLatch.await(4000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            logger.error(Log.op("連線ZooKeeper異常").kv("host:", host).toString(), e);
        }
        logger.info(Log.op("連線ZooKeeper完成").kv("host:", host).toString());
    }

    class SessionWatcher implements Watcher {

        public void process(WatchedEvent watchedEvent) {
            logger.info(Log.op("zookeeper事件")
                    .kv("state:", watchedEvent.getState()).kv("type:", watchedEvent.getType()).toString());
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                countDownLatch.countDown();
            }

            if (watchEvent != null){
                watchEvent.watchAfter(watchedEvent);
            }

        }

    }

    /**
     * 根據路徑建立節點,並且設定節點資料
     * @param path
     * @param data
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String createNode(String path,byte[] data){
        logger.info(Log.op("建立ZooKeeper節點").kv("nodePath:", path).kv("data:", data).toString());
        String s = null;
        try {
            s = this.zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (Exception e) {
            logger.error(Log.op("建立ZooKeeper節點異常").kv("nodePath:", path).kv("data:", data).toString(), e);
        }
        return s;
    }

    /**
     * 節點是否存在
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public boolean exists(String path) {
        boolean bl = false;
        try {
            if (this.zookeeper.exists(path, true) != null) {
                bl = true;
            }
        } catch (Exception e) {
            logger.error(Log.op("判斷zookeeper節點是否存在異常").toString(), e);

        }
        return bl;
    }

    /**
     * 為節點設定監聽
     * @param path
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void existsWatch(String path) {
        try {
            this.zookeeper.exists(path, sessionWatcher);
        } catch (Exception e) {
            logger.error(Log.op("設定ZooKeeper節點監聽異常").kv("nodePath:", path).toString(), e);
        }
    }

    /**
     * 關閉zookeeper連線
     * @throws InterruptedException
     */
    public void closeConnect() {
        if(null != zookeeper){
            try {
                zookeeper.close();
            } catch (InterruptedException e) {
                logger.error(Log.op("關閉zookeeper連線異常").toString(), e);
            }
        }
    }

}

只是簡單是寫了些自己要用到的操作,

介面 IWatchEvent 用來在watch事件執行個性化操作

public interface IWatchEvent {

    void watchAfter(WatchedEvent watchedEvent);
}
最後是關鍵的ZkConfig類
@Configuration
public class ZkConfig {

    @Value("${zookeeper.host}")
    private String host;

    @Value("${zookeeper.nodePath.rule}")
    private String nodePath;

    public Logger logger = LoggerFactory.getLogger(getClass());

    private ZkClient client = null;

    @Resource
    DroolsHelperService droolsHelperService;

    class CusWatcherEvent implements IWatchEvent{

        @Override
        public void watchAfter(WatchedEvent watchedEvent) {
            try {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged){
                    logger.info(Log.op("ZooKeeper節點資料變化").kv("nodePath:", nodePath).toString());
                    droolsHelperService.loadRule();
                }
                //失效重連
                if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                    client.closeConnect();
                    client.connectZookeeper(host);
                }

            } catch (Exception e) {
                logger.error(Log.op("載入規則異常").kv("nodePath:", nodePath).toString(), e);
            }finally {
                logger.info(Log.op("重新監聽ZooKeeper節點").kv("nodePath:", nodePath).toString());
                client.existsWatch(nodePath);
            }
        }
    }
    /**
     * 新增zk節點監聽
     */
    @PostConstruct
    private void init(){
        client = new ZkClient(new CusWatcherEvent());
        client.connectZookeeper(host);
        byte[] bytes = new byte[]{1};
        if (!client.exists(nodePath)) {
            client.createNode(nodePath, bytes);
        }else{
            logger.info(Log.op("ZooKeeper節點已存在").kv("nodePath:", nodePath).toString());
        }

    }

}
watch 是一次性監聽,所以每次都註冊新的監聽


對zookeeper並不是很瞭解

有個問題

過了一段時間(一個小時以上,具體多久不清楚) 會發生 Watcher.Event.KeeperState.Expired 事件,然後就失去對zk節點的監聽,所以我在程式碼裡檢測這個事件,然後重連zk。

寫的應該是有問題的,,希望有大俠能解答我的疑問,,    :)