1. 程式人生 > >Apache Curator操作zookeeper的API使用——watcher

Apache Curator操作zookeeper的API使用——watcher

curator在註冊watch事件上,提供了一個usingWatcher方法,使用這個方法註冊的watch事件和預設watch事件一樣,監聽只會觸發一次,監聽完畢後就會銷燬,也就是一次性的。而這個方法有兩種引數可選,一個是zk原生API的Watcher介面的實現類,另一個是Curator提供的CuratorWatcher介面的實現類,不過在usingWatcher方法上使用哪一個效果都是一樣的,都是一次性的

新建一個 MyWatcher 實現類,實現 Watcher 介面。程式碼如下:

package org.zero01.zk.curator;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * @program: zookeeper-connection
 * @description:  zk原生API的Watcher介面實現
 * @author: 01
 * @create: 2018-04-28 13:41
 **/
public class MyWatcher implements Watcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) {
        System.out.println("觸發watcher,節點路徑為:" + watchedEvent.getPath());
    }
}

新建一個 MyCuratorWatcher 實現類,實現 CuratorWatcher 介面。程式碼如下:

package org.zero01.zk.curator;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

/**
 * @program: zookeeper-connection
 * @description: Curator提供的CuratorWatcher介面實現
 * @author: 01
 * @create: 2018-04-28 13:40
 **/
public class MyCuratorWatcher implements CuratorWatcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println("觸發watcher,節點路徑為:" + watchedEvent.getPath());
    }
}

修改 CuratorConnect 類的main方法程式碼如下,因為在usingWatcher方法上使用一個介面的實現類效果都是一樣的,所以這裡就只演示其中一種。程式碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 例項化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 新增 watcher 事件,當使用usingWatcher的時候,監聽只會觸發一次,監聽完畢後就銷燬
        curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        // curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

        Thread.sleep(100000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));
    }
}

執行該類,然後到zookeeper伺服器上修改/super/testNode節點的資料:

[zk: localhost:2181(CONNECTED) 35] set /workspace/super/testNode new-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb0000002b
mtime = Sat Apr 28 21:40:58 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 3
[zk: localhost:2181(CONNECTED) 36] 

修改完成後,此時控制檯輸出內容如下,因為workspace是名稱空間節點,所以不會被打印出來:

觸發watcher,節點路徑為:/super/testNode

curator之nodeCache一次註冊N次監聽

想要實現watch一次註冊n次監聽的話,我們需要使用到curator裡的一個NodeCache物件。這個物件可以用來快取節點資料,並且可以給節點新增nodeChange事件,當節點的資料發生變化就會觸發這個事件。

我們依舊是使用之前的demo進行演示,修改 CuratorConnect 類中的main方法程式碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 例項化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // NodeCache: 快取節點,並且可以監聽資料節點的變更,會觸發事件
        final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);

        // 引數 buildInitial : 初始化的時候獲取node的值並且快取
        nodeCache.start(true);

        // 獲取快取裡的節點初始化資料
        if (nodeCache.getCurrentData() != null) {
            System.out.println("節點初始化資料為:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("節點初始化資料為空...");
        }

        // 為快取的節點新增watcher,或者說新增監聽器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            // 節點資料change事件的通知方法
            public void nodeChanged() throws Exception {
                // 防止節點被刪除時發生錯誤
                if (nodeCache.getCurrentData() == null) {
                    System.out.println("獲取節點資料異常,無法獲取當前快取的節點資料,可能該節點已被刪除");
                    return;
                }
                // 獲取節點最新的資料
                String data = new String(nodeCache.getCurrentData().getData());
                System.out.println(nodeCache.getCurrentData().getPath() + " 節點的資料發生變化,最新的資料為:" + data);
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));
    }
}

執行該類後,我們到zookeeper伺服器上,對/super/testNode節點進行如下操作:

[zk: localhost:2181(CONNECTED) 2] set /workspace/super/testNode change-data     
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000037
mtime = Sat Apr 28 23:49:42 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 3      
[zk: localhost:2181(CONNECTED) 3] set /workspace/super/testNode change-agin-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000038
mtime = Sat Apr 28 23:50:01 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 3
[zk: localhost:2181(CONNECTED) 8] delete /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 9] create /workspace/super/testNode test-data
Created /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 10]

輸出

當前客戶端的狀態:連線中...
節點初始化資料為:new-data
/super/testNode 節點的資料發生變化,最新的資料為:change-data
/super/testNode 節點的資料發生變化,最新的資料為:change-agin-data
獲取節點資料異常,無法獲取當前快取的節點資料,可能該節點已被刪除
/super/testNode 節點的資料發生變化,最新的資料為:test-data
當前客戶端的狀態:已關閉...

從控制檯輸出的內容可以看到,只要資料發生改變了都會觸發這個事件,並且是可以重複觸發的,而不是一次性的。

curator之PathChildrenCache子節點監聽

使用NodeCache雖然能實現一次註冊n次監聽,但是卻只能監聽一個nodeChanged事件,也就是說建立、刪除以及子節點的事件都無法監聽。如果我們要監聽某一個節點的子節點的事件,或者監聽某一個特定節點的增刪改事件都需要藉助PathChildrenCache來實現。從名稱上可以看到,PathChildrenCache也是用快取實現的,並且也是一次註冊n次監聽。當我們傳遞一個節點路徑時是監聽該節點下的子節點事件,如果我們要限制監聽某一個節點,只需要加上判斷條件即可。

我們這裡演示簡單子節點事件的監聽,修改 CuratorConnect 類的main方法程式碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 例項化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));

        // 父節點路徑
        String nodePath = "/super/testNode";

        // 為子節點新增watcher
        // PathChildrenCache: 監聽資料節點的增刪改,可以設定觸發的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, nodePath, true);

        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件
         * NORMAL:非同步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 列出子節點資料列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能獲得,非同步是獲取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("當前節點的子節點詳細資料列表:");
        for (ChildData childData : childDataList) {
            System.out.println("\t* 子節點路徑:" + new String(childData.getPath()) + ",該節點的資料為:" + new String(childData.getData()));
        }

        // 新增事件監聽器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通過判斷event type的方式來實現不同事件的觸發
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子節點初始化時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 新增子節點時觸發
                    System.out.println("\n--------------\n");
                    System.out.print("子節點:" + event.getData().getPath() + " 新增成功,");
                    System.out.println("該子節點的資料為:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 刪除子節點時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點:" + event.getData().getPath() + " 刪除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子節點資料時觸發
                    System.out.println("\n--------------\n");
                    System.out.print("子節點:" + event.getData().getPath() + " 資料更新成功,");
                    System.out.println("子節點:" + event.getData().getPath() + " 新的資料為:" + new String(event.getData().getData()));
                }
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));
    }
}

執行該類,然後到zookeeper伺服器上執行如下操作:

[zk: localhost:2181(CONNECTED) 0] create /workspace/super/testNode/fourNode four-node-data
Created /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 1] set /workspace/super/testNode/oneNode change-node-data
cZxid = 0xc00000002
ctime = Sun Apr 29 18:23:57 CST 2018
mZxid = 0xc00000023
mtime = Sun Apr 29 20:16:22 CST 2018
pZxid = 0xc00000002
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] delete /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 5] create /workspace/super/testNode/fiveNode five-node-data
Created /workspace/super/testNode/fiveNode
[zk: localhost:2181(CONNECTED) 6] 

輸出

當前客戶端的狀態:連線中...
當前節點的子節點詳細資料列表:
    * 子節點路徑:/super/testNode/oneNode,該節點的資料為:one-node-data
    * 子節點路徑:/super/testNode/threeNode,該節點的資料為:three-node-data
    * 子節點路徑:/super/testNode/twoNode,該節點的資料為:two-node-data

--------------

子節點:/super/testNode/fourNode 新增成功,該子節點的資料為:four-node-data

--------------

子節點:/super/testNode/oneNode 資料更新成功,子節點:/super/testNode/oneNode 新的資料為:change-node-data

--------------

子節點:/super/testNode/fourNode 刪除成功

--------------

子節點:/super/testNode/fiveNode 新增成功,該子節點的資料為:five-node-data
當前客戶端的狀態:已關閉...

以上的演示例子中為了獲取子節點列表,所以我們的程式碼使用的是同步初始化模式。如果使用非同步初始化是獲取不到子節點列表的,例如修改childrenCache.start程式碼如下:

childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

這種模式會在初始化時觸發子節點初始化以及新增子節點這兩個事件,將初始化模式修改成POST_INITIALIZED_EVENT後,執行該類,此時控制檯輸出資訊如下:

當前客戶端的狀態:連線中...
當前節點的子節點詳細資料列表:

--------------

子節點:/super/testNode/threeNode 新增成功,該子節點的資料為:three-node-data

--------------

子節點:/super/testNode/twoNode 新增成功,該子節點的資料為:two-node-data

--------------

子節點:/super/testNode/fiveNode 新增成功,該子節點的資料為:five-node-data

--------------

子節點:/super/testNode/oneNode 新增成功,該子節點的資料為:change-node-data

--------------

子節點:/super/testNode/fourNode 新增成功,該子節點的資料為:four-node-data

--------------

子節點初始化成功
當前客戶端的狀態:已關閉...

從控制檯輸出資訊中可以看到,在這種模式下,子節點列表並沒有被獲取出來。除此之外,會觸發新增子節點事件以及子節點初始化事件。因為快取初始化時是把子節點新增到快取裡,所以會觸發新增子節點事件,而新增完成之後,就會觸發子節點初始化完成事件。

我們再來看看另一種非同步初始化的模式:NORMAL模式,在這種模式下,同樣的無法獲取子節點列表,並且也會觸發新增子節點事件,但是不會觸發子節點初始化完成事件。修改childrenCache.start程式碼如下:

childrenCache.start(PathChildrenCache.StartMode.NORMAL);

將初始化模式修改成NORMAL後,執行該類,此時控制檯輸出資訊如下:

當前客戶端的狀態:連線中...
當前節點的子節點詳細資料列表:

--------------

子節點:/super/testNode/threeNode 新增成功,該子節點的資料為:three-node-data

--------------

子節點:/super/testNode/twoNode 新增成功,該子節點的資料為:two-node-data

--------------

子節點:/super/testNode/fiveNode 新增成功,該子節點的資料為:five-node-data

--------------

子節點:/super/testNode/oneNode 新增成功,該子節點的資料為:change-node-data

--------------

子節點:/super/testNode/fourNode 新增成功,該子節點的資料為:four-node-data
當前客戶端的狀態:已關閉...

從控制檯輸出資訊中可以看到,在這種模式下,子節點列表並沒有被獲取出來。除此之外,還會觸發新增子節點事件。通常使用非同步初始化的情況下,都是使用POST_INITIALIZED_EVENT模式,NORMAL較為少用

如果我們想要監聽某一個特定的節點,例如我們要監聽/super/testNode這個節點,那麼以上面的程式碼作為例子,就需要把nodePath改為/super。然後在每一個判斷條件中,再加上一個子判斷條件,將節點限定為/super/testNode才會觸發,這樣就能實現監聽某一個節點的增刪改事件了。如下示例:

...
public class CuratorConnect {
    ...

    private static final String PARENT_NODE_PATH = "/super";  // 父節點
    private static final String NODE_PATH = "/super/testNode";  // 需要被特定監聽的節點

    public static void main(String[] args) throws Exception {
        // 例項化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));

        // 為子節點新增watcher
        // PathChildrenCache: 監聽資料節點的增刪改,可以設定觸發的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, PARENT_NODE_PATH, true);

        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 新增事件監聽器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通過判斷event type的方式來實現不同事件的觸發
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子節點初始化時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 新增子節點時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子節點:" + event.getData().getPath() + " 新增成功,");
                        System.out.println("該子節點的資料為:" + new String(event.getData().getData()));
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 刪除子節點時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.println("子節點:" + event.getData().getPath() + " 刪除成功");
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子節點資料時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子節點:" + event.getData().getPath() + " 資料更新成功,");
                        System.out.println("子節點:" + event.getData().getPath() + " 新的資料為:" + new String(event.getData().getData()));
                    }
                }
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連線中..." : "已關閉..."));
    }
}

這是最簡單粗暴的方式了,當然在實際開發中,肯定會寫得好一些,這個演示只是為了說明可以藉助PathChildrenCache來實現某個特點節點的增刪改事件監聽。

zk-watcher應用例項之模擬統一更新N臺節點的配置檔案

zookeeper有一個比較常見的應用場景就是統一管理、更新分散式叢集環境中每個節點的配置檔案,我們可以在程式碼中監聽叢集中的節點,當節點資料發生改變時就同步到其他節點上。如下圖:

因為我們使用的json作為節點儲存的資料格式,所以需要準備一個工具類來做json與pojo物件的一個轉換,也就是所謂的反序列化。建立一個 JsonUtils 類,程式碼如下:

package org.zero01.zk.util;

import java.util.List;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * @Title: JsonUtils.java
 * @Package org.zero01.zk.util
 * @Description: JSON/物件轉換類
 */
public class JsonUtils {

    // 定義jackson物件
    private static final ObjectMapper MAPPER = new ObjectMapper();

    /**
     * 將物件轉換成json字串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
        try {
            String string = MAPPER.writeValueAsString(data);
            return string;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 將json結果集轉化為物件
     *
     * @param jsonData json資料
     * @param beanType 物件中的object型別
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 將json資料轉換成pojo物件list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
        JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
        try {
            List<T> list = MAPPER.readValue(jsonData, javaType);
            return list;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

建立一個pojo類,封裝json格式的資料:

package org.zero01.zk.util;

public class RedisConfig {

    private String type;    // add 新增配置 update 更新配置 delete 刪除配置
    private String url;        // 如果是add或update,則提供下載地址
    private String remark;    // 備註
    ... gtter stter 略 ...
}

然後建立客戶端類,客戶端類就是用來監聽叢集中的節點的。由於是模擬,所以這裡的部分程式碼是虛擬碼。客戶端類我們這裡建立了三個,因為叢集中有三個節點,由於程式碼基本上是一樣的,所以這裡只貼出客戶端_1的程式碼。如下:

package org.zero01.zk.checkconfig;

import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

import org.zero01.zk.util.JsonUtils;
import org.zero01.zk.util.RedisConfig;

public class Client_1 {

    public CuratorFramework client = null;
    public static final String zkServerIp = "192.168.190.128:2181";

    // 初始化重連策略以及客戶端物件並啟動
    public Client_1() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerIp)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    // 關閉客戶端
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    //  public final static String CONFIG_NODE = "/super/testNode/redis-config";
    public final static String CONFIG_NODE_PATH = "/super/testNode";
    public final static String SUB_PATH = "/redis-config";
    public static CountDownLatch countDown = new CountDownLatch(1);  // 計數器

    public static void main(String[] args) throws Exception {
        Client_1 cto = new Client_1();
        System.out.println("client1 啟動成功...");

        // 開啟子節點快取
        final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
        childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

        // 新增子節點監聽事件
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                // 監聽節點的資料更新事件
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    String configNodePath = event.getData().getPath();
                    if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                        System.out.println("監聽到配置發生變化,節點路徑為:" + configNodePath);

                        // 讀取節點資料
                        String jsonConfig = new String(event.getData().getData());
                        System.out.println("節點" + CONFIG_NODE_PATH + "的資料為: " + jsonConfig);

                        // 從json轉換配置
                        RedisConfig redisConfig = null;
                        if (StringUtils.isNotBlank(jsonConfig)) {
                            redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                        }

                        // 配置不為空則進行相應操作
                        if (redisConfig != null) {
                            String type = redisConfig.getType();
                            String url = redisConfig.getUrl();
                            String remark = redisConfig.getRemark();
                            // 判斷事件
                            if (type.equals("add")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到新增的配置,準備下載...");
                                // ... 連線ftp伺服器,根據url找到相應的配置
                                Thread.sleep(500);
                                System.out.println("開始下載新的配置檔案,下載路徑為<" + url + ">");
                                // ... 下載配置到你指定的目錄
                                Thread.sleep(1000);
                                System.out.println("下載成功,已經新增到專案中");
                                // ... 拷貝檔案到專案目錄
                            } else if (type.equals("update")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到更新的配置,準備下載...");
                                // ... 連線ftp伺服器,根據url找到相應的配置
                                Thread.sleep(500);
                                System.out.println("開始下載配置檔案,下載路徑為<" + url + ">");
                                // ... 下載配置到你指定的目錄
                                Thread.sleep(1000);
                                System.out.println("下載成功...");
                                System.out.println("刪除專案中原配置檔案...");
                                Thread.sleep(100);
                                // ... 刪除原檔案
                                System.out.println("拷貝配置檔案到專案目錄...");
                                // ... 拷貝檔案到專案目錄
                            } else if (type.equals("delete")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到需要刪除配置");
                                System.out.println("刪除專案中原配置檔案...");
                            }
                            // TODO 視情況統一重啟服務
                        }
                    }
                }
            }
        });

        countDown.await();

        cto.closeZKClient();
    }
}

完成以上程式碼的編寫後,將所有的客戶類都執行起來。然後到zookeeper伺服器上,進行如下操作:

[zk: localhost:2181(CONNECTED) 14] set /workspace/super/testNode/redis-config {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000043
mtime = Mon Apr 30 01:52:35 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 75
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] set /workspace/super/testNode/redis-config {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000044
mtime = Mon Apr 30 01:53:46 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 81
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] set /workspace/super/testNode/redis-config {"type":"delete","url":"","remark":"delete"}   
cZxid = 0xc00000039               
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000045
mtime = Mon Apr 30 01:54:06 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 44
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

如上,從三個客戶端的控制檯輸出資訊可以看到,三個節點都進行了同樣操作,觸發了同樣的watch事件,這樣就可以完成統一的配置檔案管理。