1. 程式人生 > >基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache

基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache

轉自:https://blog.csdn.net/Leafage_M/article/details/78735485#treecache

Java原生API操作ZooKeeper可參看:

Java原生API操作Zookeeper(一)

Java原生API操作Zookeeper(二)

相關內容:

基於Curator操作ZooKeeper(一)-基本操作

基於Curator操作ZooKeeper(二)-Watcher操作

TreeCache

TreeCache有點像上面兩種Cache的結合體,NodeCache能夠監聽自身節點的資料變化(或者是建立該節點),PathChildrenCache能夠監聽自身節點下的子節點的變化,而TreeCache既能夠監聽自身節點的變化、也能夠監聽子節點的變化。

TreeCache的話只有一種構造方法了:

TreeCache(CuratorFramework client, String path)//Create a TreeCache for the given client and path with default options.

與上面的PathChildrenCache不同的是,如果指定的節點路徑不存在的話,不會自動建立。但是也能夠監聽到一個INITIALIZED型別的事件。

TreeCache可以新增一個節點變化的監聽器,同樣的也可以新增一個異常的監聽器。

假如ZooKeeper伺服器中還是有如下的節點內容:

使用如下程式碼進行測試,就是當節點的內容發生變化時,輸出節點的變化型別和變化的節點路徑。

public class TreeCacheTest {

    private static final String zkAddress = "centos3";
    private static final int sessionTimeout = 2000;
    private static String parentPath = "/Curator-Recipes1";//父節點

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build();

    public static void main(String[] args) throws InterruptedException {
        client.start();
        final TreeCache treeCache = new TreeCache(client, parentPath);
        try {
            treeCache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        //新增錯誤監聽器
        treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            public void unhandledError(String s, Throwable throwable) {
                System.out.println(".錯誤原因:" + throwable.getMessage() + "\n==============\n");
            }
        });

        //節點變化的監聽器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("treeCache ------ Type:" + treeCacheEvent.getType() + ",");
                System.out.println(treeCacheEvent.getData().getPath());

            }
        });

        Thread.sleep(Integer.MAX_VALUE);
    }
}

執行時輸出的內容如下:

可以看到快取父節點和子節點的時候觸發了NODE_ADDED事件,同時也會觸發一個INITIALIZED事件,並且會在異常監聽器中發生回撥事件,異常的原因就是我們回撥函式因為INITIALIZED事件被觸發了,但是此時執行第條輸出語句的時候發現treeCacheEvent.getData()是null,而我們在null上呼叫了getPath方法,所以會觸發異常監聽。

注意:

  • 無論如何都會收到一個INITIALIZED事件的。
  • 無論是TreeCache、PathChildrenCache,所謂的監聽都是本地檢視和ZooKeeper伺服器進行對比。所以如果ZooKeeper節點不為空的話,才會在快取開始的時候監聽到NODE_ADDED事件,這是因為剛開始本地快取並沒有內容,然後本地快取和伺服器快取進行對比,發現ZooKeeper伺服器有節點而本地快取沒有,這才將伺服器的節點快取到本地,所以才會觸發NODE_ADDED事件。

示例

來一個綜合性的示例,該示例中先使用init進行初始化操作,會建立一個父節點“/Curator-Recipes”以及父節點下的一個子節點“c1”。然後分別使用TreeCache、PathChildrenCache、NodeCache三種快取方式進行快取。最後再建立c2、c3兩個節點並進行修改和刪除觀察監聽器的輸出內容。

package com.leafage.zk;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
 * @Author Leafage
 * @Date 2017/12/6 14:03
 **/
public class CuratorRecipes {

    private static int num = 1;

    private static final String zkAddress = "centos3";
    private static final int sessionTimeout = 2000;
    private static String parentPath = "/Curator-Recipes";//父節點

    private static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString(zkAddress)
            .sessionTimeoutMs(sessionTimeout)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build();

    public static void main(String[] args) throws InterruptedException {

        init();
        treeCache();
        pathChildrenCache();
        nodeCache();
        testData();
        Thread.sleep(Integer.MAX_VALUE);

    }

    /**
     * 初始化操作,建立父節點
     */
    public static void init() {
        client.start();
        try {
            //可以用用來確保父節點存在,2.9之後棄用
//            EnsurePath ensurePath = new EnsurePath(parentPath);
//            ensurePath.ensure(client.getZookeeperClient());

            if (client.checkExists().forPath(parentPath) == null) {
                client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(parentPath, "This is Parent Data!".getBytes());
            }

            client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c1","This is C1.".getBytes());//建立第一個節點

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 監聽子節點變化
     */
    public static void pathChildrenCache() {
        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, parentPath, true);
        try {
            pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//啟動模式
        } catch (Exception e) {
            e.printStackTrace();
        }
        //新增監聽
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println( num++ + ".pathChildrenCache------發生的節點變化型別為:" + pathChildrenCacheEvent.getType() + ",發生變化的節點內容為:" + new String(pathChildrenCacheEvent.getData().getData()) + "\n======================\n");
            }
        });
    }


    /**
     * 監聽節點資料變化
     */
    public static void nodeCache() {
        final NodeCache nodeCache = new NodeCache(client, parentPath, false);
        try {
            nodeCache.start(true);//true代表快取當前節點
        } catch (Exception e) {
            e.printStackTrace();
        }

        if (nodeCache.getCurrentData() != null) {//只有start中的設定為true才能夠直接得到
            System.out.println( num++ + ".nodeCache-------CurrentNode Data is:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");//輸出當前節點的內容
        }

        //新增節點資料監聽
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println( num++ + ".nodeCache------節點資料發生了改變,發生的路徑為:" + nodeCache.getCurrentData().getPath() + ",節點資料發生了改變 ,新的資料為:" + new String(nodeCache.getCurrentData().getData()) + "\n===========================\n");
            }
        });
    }

    /**
     * 同時監聽資料變化和子節點變化
     */
    public static void treeCache() {
        final TreeCache treeCache = new TreeCache(client, parentPath);
        try {
            treeCache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        //新增錯誤
        treeCache.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
            public void unhandledError(String s, Throwable throwable) {
                System.out.println(num++ + ".錯誤原因:" + throwable.getMessage() + "\n==============\n");
            }
        });

        treeCache.getListenable().addListener(new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println( num++ + ".treeCache------當前發生的變化型別為:" +  treeCacheEvent.getType() + ",發生變化的節點內容為:" + new String(treeCacheEvent.getData().getData()) + "\n=====================\n");
            }
        });
    }

    /**
     * 建立節點、修改資料、刪除節點等操作,用來給其他的監聽器測試使用
     */
    public static void testData() {
        try {
            client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c2","This is C2.".getBytes());//建立第一個節點
            client.create().withMode(CreateMode.EPHEMERAL).forPath(parentPath + "/c3","This is C3.".getBytes());//建立第一個節點

            client.setData().forPath(parentPath + "/c2", "This is New C2.".getBytes());//修改節點資料

            client.delete().forPath(parentPath + "/c3");//刪除一個節點

            client.delete().deletingChildrenIfNeeded().forPath(parentPath);//將父節點下所有內容刪除

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

輸出內容(每次執行結果輸出順序可能不一致):

1.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is Parent Data!
=====================

2.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C1.
=====================

4.錯誤原因:null
==============

5.nodeCache-------CurrentNode Data is:This is Parent Data!
===========================

6.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C1.
======================

8.nodeCache------節點資料發生了改變,發生的路徑為:/Curator-Recipes,節點資料發生了改變 ,新的資料為:This is Parent Data!
===========================

9.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C2.
=====================

10.treeCache------當前發生的變化型別為:NODE_ADDED,發生變化的節點內容為:This is C3.
=====================

11.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C3.
======================

12.pathChildrenCache------發生的節點變化型別為:CHILD_ADDED,發生變化的節點內容為:This is C2.
======================

13.treeCache------當前發生的變化型別為:NODE_UPDATED,發生變化的節點內容為:This is New C2.
=====================

14.pathChildrenCache------發生的節點變化型別為:CHILD_UPDATED,發生變化的節點內容為:This is New C2.
======================

15.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is C3.
======================

16.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is C3.
=====================

17.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is C1.
=====================

18.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is C1.
======================

19.pathChildrenCache------發生的節點變化型別為:CHILD_REMOVED,發生變化的節點內容為:This is New C2.
======================

20.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is New C2.
=====================

21.treeCache------當前發生的變化型別為:NODE_REMOVED,發生變化的節點內容為:This is Parent Data!
=====================

從輸出內容可以看到:

1、2 :代表是TreeCache監聽到了父節點和c1節點的建立快取事件。

3、4 :同時會發現並沒有3這條語句,而是直接跳到了4,這是因為接收到的事件為:INITIALIZED,所以使用getData會得到null,而我們試圖在null上呼叫getPath,所以才會觸發異常。

5:NodeCache快取進行start的時候傳入true引數,所以能夠直接得到當前節點的內容。

6: PathChildrenCache快取成功c1的時候接收到的事件。

7:會發現沒有7,因為PathChildrenCache的啟動模式是:INITIALIZED,此時也是試圖在null上呼叫GetPath,但是PathChildrenCache沒有提供異常監聽器,所以沒辦法獲取。

8:第八點最讓人疑惑了,因為上面的程式碼中並沒有對父節點的資料進行改變,但是卻監聽到了這個事件,做了很多的測試發現,觸發這個事件的原因為後面的testData方法中呼叫create導致的,並且只會監聽到一次,這一點的具體原因還不太清楚。

9、10、11、12:建立c2、c3節點是TreeCache和PathChildrenCache監聽到的事件。

13、14:修改c2節點資料,TreeCache和PathChildrenCache監聽到的事件。

15、16、17、18、19、20:刪除c2、c1、c3節點時,TreeCache和PathChildrenCache監聽到的事件。

21:刪除根節點時接收到的監聽事件,此時只有TreeCache能夠監聽到。