1. 程式人生 > >Apache Curator操作zookeeper的API使用

Apache Curator操作zookeeper的API使用

zookeeper 分布式 集群 curator 中間件

curator簡介與客戶端之間的異同點

常用的zookeeper java客戶端:

  • zookeeper原生Java API
  • zkclient
  • Apache curator

ZooKeeper原生Java API的不足之處:

  • 在連接zk超時的時候,不支持自動重連,需要手動操作
  • Watch註冊一次就會失效,需要反復註冊
  • 不支持遞歸創建節點

Apache curator:

  • Apache 的開源項目
  • 解決Watch註冊一次就會失效的問題
  • 提供的 API 更加簡單易用
  • 提供更多解決方案並且實現簡單,例如:分布式鎖
  • 提供常用的ZooKeeper工具類
  • 編程風格更舒服,

搭建maven工程,建立curator與zkserver的連接

創建一個普通的maven工程,在pom.xml文件中,配置如下依賴:

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.7.4</version>
        </dependency>
    </dependencies>

配置完依賴後,我們就可以來寫一個簡單的demo測試與zookeeper服務端的連接。代碼如下:

package org.zero01.zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @program: zookeeper-connection
 * @description: 建立curator與zkserver的連接演示demo
 * @author: 01
 * @create: 2018-04-28 09:44
 **/
public class CuratorConnect {

    // Curator客戶端
    public CuratorFramework client = null;
    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";

    public CuratorConnect(){
        /**
         * 同步創建zk示例,原生api是異步的
         * 這一步是設置重連策略
         *
         * ExponentialBackoffRetry構造器參數:
         *  curator鏈接zookeeper的策略:ExponentialBackoffRetry
         *  baseSleepTimeMs:初始sleep的時間
         *  maxRetries:最大重試次數
         *  maxSleepMs:最大重試時間
         */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        // 實例化Curator客戶端,Curator的編程風格可以讓我們使用方法鏈的形式完成客戶端的實例化
        client = CuratorFrameworkFactory.builder() // 使用工廠類來建造客戶端的實例對象
                .connectString(zkServerIps)  // 放入zookeeper服務器ip
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 設定會話時間以及重連策略
                .build();  // 建立連接通道

        // 啟動Curator客戶端
        client.start();
    }

    // 關閉zk客戶端連接
    private void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

控制臺輸出信息如下:

當前客戶端的狀態:連接中...
當前客戶端的狀態:已關閉...

curator連接zookeeper服務器時有自動重連機制,而curator的重連策略有五種。第一種就是我們以上demo中使用到的:

/**
 * (推薦)
 * 同步創建zk示例,原生api是異步的
 * 這一步是設置重連策略
 * 
 * 構造器參數:
 *  curator鏈接zookeeper的策略:ExponentialBackoffRetry
 *  baseSleepTimeMs:初始sleep的時間
 *  maxRetries:最大重試次數
 *  maxSleepMs:最大重試時間
 */
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

第二種,可設定重連n次:

/**
 * (推薦)
 * curator鏈接zookeeper的策略:RetryNTimes
 * 
 * 構造器參數:
 * n:重試的次數
 * sleepMsBetweenRetries:每次重試間隔的時間
 */
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

第三種,只會重連一次:

/**
 * (不推薦)
 * curator鏈接zookeeper的策略:RetryOneTime
 * 
 * 構造器參數:
 * sleepMsBetweenRetry:每次重試間隔的時間
 * 這個策略只會重試一次
 */
RetryPolicy retryPolicy2 = new RetryOneTime(3000);

第四種,永遠重連:

/**
 * 永遠重試,不推薦使用
 */
RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)

第五種,可設定最大重試時間:

/**
 * curator鏈接zookeeper的策略:RetryUntilElapsed
 * 
 * 構造器參數:
 * maxElapsedTimeMs:最大重試時間
 * sleepMsBetweenRetries:每次重試間隔
 * 重試時間超過maxElapsedTimeMs後,就不再重試
 */
RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

zk命名空間以及創建節點

zookeeper的命名空間就類似於我們平時使用Eclipse等開發工具的工作空間一樣,我們該連接中所有的操作都是基於這個命名空間的。curator提供了設置命名空間的方法,這樣我們任何的連接都可以去設置一個命名空間。設置了命名空間並成功連接後,zookeeper的根節點會多出一個以命名空間名稱所命名的節點。然後我們在該連接的增刪查改等操作都會在這個節點中進行。例如,現在zookeeper服務器上只有以下幾個節點:

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper, data, real-culster, testDigestNode]
[zk: localhost:2181(CONNECTED) 1]

然後我們來將之前的demo修改一下,加上設置命名空間的代碼以及創建節點的代碼,以此來做一個簡單的演示,修改之前的demo代碼如下:

package org.zero01.zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

public class CuratorConnect {

    // Curator客戶端
    public CuratorFramework client = null;
    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";

    public CuratorConnect() {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        // 實例化Curator客戶端
        client = CuratorFrameworkFactory.builder() // 使用工廠類來建造客戶端的實例對象
                .connectString(zkServerIps)  // 放入zookeeper服務器ip
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 設定會話時間以及重連策略
                .namespace("workspace").build();  // 設置命名空間以及開始建立連接

        // 啟動Curator客戶端
        client.start();
    }

    // 關閉zk客戶端連接
    private void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 創建節點
        String nodePath = "/super/testNode";  // 節點路徑
        byte[] data = "this is a test data".getBytes();  // 節點數據
        String result = curatorConnect.client.create().creatingParentsIfNeeded()  // 創建父節點,也就是會遞歸創建
                .withMode(CreateMode.PERSISTENT)  // 節點類型
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)  // 節點的acl權限
                .forPath(nodePath, data);

        System.out.println(result + "節點,創建成功...");

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,控制臺輸出信息如下:

當前客戶端的狀態:連接中...
/super/testNode節點,創建成功...
當前客戶端的狀態:已關閉...

到服務器上,查看是否多了一個 workspace 節點,並且我們創建的節點都在這個節點下:

[zk: localhost:2181(CONNECTED) 12] ls /
[workspace, zookeeper, data, real-culster, testDigestNode]
[zk: localhost:2181(CONNECTED) 13] ls /workspace
[super]
[zk: localhost:2181(CONNECTED) 14] ls /workspace/super
[testNode]
[zk: localhost:2181(CONNECTED) 15] get /workspace/super/testNode
this is a test data
cZxid = 0xb0000000f
ctime = Sat Apr 28 18:56:36 CST 2018
mZxid = 0xb0000000f
mtime = Sat Apr 28 18:56:36 CST 2018
pZxid = 0xb0000000f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 19
numChildren = 0
[zk: localhost:2181(CONNECTED) 18] getAcl /workspace/super/testNode
‘world,‘anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 19] 

修改節點以及刪除節點

上一節中,我們介紹了如何創建節點,本節我們來簡單演示一下如何修改節點的數據以及刪除節點。修改 CuratorConnect 類代碼如下:

package org.zero01.zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorConnect {

    // Curator客戶端
    public CuratorFramework client = null;
    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";

    public CuratorConnect() {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

        // 實例化Curator客戶端
        client = CuratorFrameworkFactory.builder() // 使用工廠類來建造客戶端的實例對象
                .connectString(zkServerIps)  // 放入zookeeper服務器ip
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)  // 設定會話時間以及重連策略
                .namespace("workspace").build();  // 設置命名空間以及開始建立連接

        // 啟動Curator客戶端
        client.start();
    }

    // 關閉zk客戶端連接
    private void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 更新節點數據
        byte[] newData = "this is a new data".getBytes();
        Stat resultStat = curatorConnect.client.setData().withVersion(0)  // 指定數據版本
                .forPath(nodePath, newData);  // 需要修改的節點路徑以及新數據

        System.out.println("更新節點數據成功,新的數據版本為:" + resultStat.getVersion());

        // 刪除節點
        curatorConnect.client.delete()
                .guaranteed()  // 如果刪除失敗,那麽在後端還是會繼續刪除,直到成功
                .deletingChildrenIfNeeded()  // 子節點也一並刪除,也就是會遞歸刪除
                .withVersion(resultStat.getVersion())
                .forPath(nodePath);

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,控制臺輸出信息如下:

當前客戶端的狀態:連接中...
更新節點數據成功,新的數據版本為:1
當前客戶端的狀態:已關閉...

此時到zookeeper服務器上,可以看到,節點已經被成功刪除了:

[zk: localhost:2181(CONNECTED) 19] ls /workspace/super
[]
[zk: localhost:2181(CONNECTED) 20] 

查詢節點相關信息

1.獲取某個節點的數據,現有一個節點的數據如下:

[zk: localhost:2181(CONNECTED) 22] get /workspace/super/testNode    
test-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000015
mtime = Sat Apr 28 20:59:57 CST 2018
pZxid = 0xb00000015
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: localhost:2181(CONNECTED) 23] 

修改 CuratorConnect 類中的main方法代碼如下,一些重復的代碼就忽略了:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 讀取節點數據
        Stat stat = new Stat();
        byte[] nodeData = curatorConnect.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("節點 " + nodePath + " 的數據為:" + new String(nodeData));
        System.out.println("該節點的數據版本號為:" + stat.getVersion());

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,控制臺輸出內容如下:

當前客戶端的狀態:連接中...
節點 /super/testNode 的數據為:test-data
該節點的數據版本號為:0
當前客戶端的狀態:已關閉...

2.獲取某個節點下的子節點列表,現有一個節點的子節點列表如下:

[zk: localhost:2181(CONNECTED) 33] ls /workspace/super/testNode
[threeNode, twoNode, oneNode]
[zk: localhost:2181(CONNECTED) 34]

修改 CuratorConnect 類中的main方法代碼如下,一些重復的代碼就忽略了:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 獲取子節點列表
        List<String> childNodes = curatorConnect.client.getChildren().forPath(nodePath);
        System.out.println(nodePath + " 節點下的子節點列表:");
        for (String childNode : childNodes) {
            System.out.println(childNode);
        }

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,控制臺輸出內容如下:

當前客戶端的狀態:連接中...
/super/testNode 節點下的子節點列表:
threeNode
twoNode
oneNode
當前客戶端的狀態:已關閉...

3.查詢某個節點是否存在,修改 CuratorConnect 類中的main方法代碼如下,一些重復的代碼就忽略了:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 查詢某個節點是否存在,存在就會返回該節點的狀態信息,如果不存在的話則返回空
        Stat statExist = curatorConnect.client.checkExists().forPath(nodePath);
        if (statExist == null) {
            System.out.println(nodePath + " 節點不存在");
        } else {
            System.out.println(nodePath + " 節點存在");
        }

        Thread.sleep(1000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,控制臺輸出內容如下:

當前客戶端的狀態:連接中...
/super/testNode 節點存在
當前客戶端的狀態:已關閉...

如果查詢一個不存在的節點,就會返回null,我們可以測試一下將nodePath改成一個不存在的節點。然後運行該類,控制臺輸出內容如下:

當前客戶端的狀態:連接中...
/super/asdasdasd 節點不存在
當前客戶端的狀態:已關閉...

至此,使用curator對zookeeper節點的增刪查改操作就演示完畢了。


curator之usingWatcher

curator在註冊watch事件上,提供了一個usingWatcher方法,使用這個方法註冊的watch事件和默認watch事件一樣,監聽只會觸發一次,監聽完畢後就會銷毀,也就是一次性的。而這個方法有兩種參數可選,一個是zk原生API的Watcher接口的實現類,另一個是Curator提供的CuratorWatcher接口的實現類,不過在usingWatcher方法上使用哪一個效果都是一樣的,都是一次性的。

新建一個 MyWatcher 實現類,實現 Watcher 接口。代碼如下:

package org.zero01.zk.curator;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * @program: zookeeper-connection
 * @description:  zk原生API的Watcher接口實現
 * @author: 01
 * @create: 2018-04-28 13:41
 **/
public class MyWatcher implements Watcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) {
        System.out.println("觸發watcher,節點路徑為:" + watchedEvent.getPath());
    }
}

新建一個 MyCuratorWatcher 實現類,實現 CuratorWatcher 接口。代碼如下:

package org.zero01.zk.curator;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;

/**
 * @program: zookeeper-connection
 * @description: Curator提供的CuratorWatcher接口實現
 * @author: 01
 * @create: 2018-04-28 13:40
 **/
public class MyCuratorWatcher implements CuratorWatcher {

    // Watcher事件通知方法
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println("觸發watcher,節點路徑為:" + watchedEvent.getPath());
    }
}

修改 CuratorConnect 類的main方法代碼如下,因為在usingWatcher方法上使用一個接口的實現類效果都是一樣的,所以這裏就只演示其中一種。代碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // 添加 watcher 事件,當使用usingWatcher的時候,監聽只會觸發一次,監聽完畢後就銷毀
        curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        // curatorConnect.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);

        Thread.sleep(100000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,然後到zookeeper服務器上修改/super/testNode節點的數據:

[zk: localhost:2181(CONNECTED) 35] set /workspace/super/testNode new-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb0000002b
mtime = Sat Apr 28 21:40:58 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 3
[zk: localhost:2181(CONNECTED) 36] 

修改完成後,此時控制臺輸出內容如下,因為workspace是命名空間節點,所以不會被打印出來:

觸發watcher,節點路徑為:/super/testNode

curator之nodeCache一次註冊N次監聽

想要實現watch一次註冊n次監聽的話,我們需要使用到curator裏的一個NodeCache對象。這個對象可以用來緩存節點數據,並且可以給節點添加nodeChange事件,當節點的數據發生變化就會觸發這個事件。

我們依舊是使用之前的demo進行演示,修改 CuratorConnect 類中的main方法代碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 節點路徑
        String nodePath = "/super/testNode";

        // NodeCache: 緩存節點,並且可以監聽數據節點的變更,會觸發事件
        final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath);

        // 參數 buildInitial : 初始化的時候獲取node的值並且緩存
        nodeCache.start(true);

        // 獲取緩存裏的節點初始化數據
        if (nodeCache.getCurrentData() != null) {
            System.out.println("節點初始化數據為:" + new String(nodeCache.getCurrentData().getData()));
        } else {
            System.out.println("節點初始化數據為空...");
        }

        // 為緩存的節點添加watcher,或者說添加監聽器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            // 節點數據change事件的通知方法
            public void nodeChanged() throws Exception {
                // 防止節點被刪除時發生錯誤
                if (nodeCache.getCurrentData() == null) {
                    System.out.println("獲取節點數據異常,無法獲取當前緩存的節點數據,可能該節點已被刪除");
                    return;
                }
                // 獲取節點最新的數據
                String data = new String(nodeCache.getCurrentData().getData());
                System.out.println(nodeCache.getCurrentData().getPath() + " 節點的數據發生變化,最新的數據為:" + data);
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類後,我們到zookeeper服務器上,對/super/testNode節點進行如下操作:

[zk: localhost:2181(CONNECTED) 2] set /workspace/super/testNode change-data     
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000037
mtime = Sat Apr 28 23:49:42 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 3      
[zk: localhost:2181(CONNECTED) 3] set /workspace/super/testNode change-agin-data
cZxid = 0xb00000015
ctime = Sat Apr 28 20:59:57 CST 2018
mZxid = 0xb00000038
mtime = Sat Apr 28 23:50:01 CST 2018
pZxid = 0xb0000001c
cversion = 3
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 3
[zk: localhost:2181(CONNECTED) 8] delete /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 9] create /workspace/super/testNode test-data
Created /workspace/super/testNode
[zk: localhost:2181(CONNECTED) 10]

此時控制臺輸出內容如下:

當前客戶端的狀態:連接中...
節點初始化數據為:new-data
/super/testNode 節點的數據發生變化,最新的數據為:change-data
/super/testNode 節點的數據發生變化,最新的數據為:change-agin-data
獲取節點數據異常,無法獲取當前緩存的節點數據,可能該節點已被刪除
/super/testNode 節點的數據發生變化,最新的數據為:test-data
當前客戶端的狀態:已關閉...

從控制臺輸出的內容可以看到,只要數據發生改變了都會觸發這個事件,並且是可以重復觸發的,而不是一次性的。


curator之PathChildrenCache子節點監聽

使用NodeCache雖然能實現一次註冊n次監聽,但是卻只能監聽一個nodeChanged事件,也就是說創建、刪除以及子節點的事件都無法監聽。如果我們要監聽某一個節點的子節點的事件,或者監聽某一個特定節點的增刪改事件都需要借助PathChildrenCache來實現。從名稱上可以看到,PathChildrenCache也是用緩存實現的,並且也是一次註冊n次監聽。當我們傳遞一個節點路徑時是監聽該節點下的子節點事件,如果我們要限制監聽某一個節點,只需要加上判斷條件即可。

我們這裏演示簡單子節點事件的監聽,修改 CuratorConnect 類的main方法代碼如下:

...
public class CuratorConnect {
    ...
    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 父節點路徑
        String nodePath = "/super/testNode";

        // 為子節點添加watcher
        // PathChildrenCache: 監聽數據節點的增刪改,可以設置觸發的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, nodePath, true);

        /**
         * StartMode: 初始化方式
         * POST_INITIALIZED_EVENT:異步初始化,初始化之後會觸發事件
         * NORMAL:異步初始化
         * BUILD_INITIAL_CACHE:同步初始化
         */
        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        // 列出子節點數據列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能獲得,異步是獲取不到的
        List<ChildData> childDataList = childrenCache.getCurrentData();
        System.out.println("當前節點的子節點詳細數據列表:");
        for (ChildData childData : childDataList) {
            System.out.println("\t* 子節點路徑:" + new String(childData.getPath()) + ",該節點的數據為:" + new String(childData.getData()));
        }

        // 添加事件監聽器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通過判斷event type的方式來實現不同事件的觸發
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子節點初始化時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子節點時觸發
                    System.out.println("\n--------------\n");
                    System.out.print("子節點:" + event.getData().getPath() + " 添加成功,");
                    System.out.println("該子節點的數據為:" + new String(event.getData().getData()));
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 刪除子節點時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點:" + event.getData().getPath() + " 刪除成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子節點數據時觸發
                    System.out.println("\n--------------\n");
                    System.out.print("子節點:" + event.getData().getPath() + " 數據更新成功,");
                    System.out.println("子節點:" + event.getData().getPath() + " 新的數據為:" + new String(event.getData().getData()));
                }
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

運行該類,然後到zookeeper服務器上執行如下操作:

[zk: localhost:2181(CONNECTED) 0] create /workspace/super/testNode/fourNode four-node-data
Created /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 1] set /workspace/super/testNode/oneNode change-node-data
cZxid = 0xc00000002
ctime = Sun Apr 29 18:23:57 CST 2018
mZxid = 0xc00000023
mtime = Sun Apr 29 20:16:22 CST 2018
pZxid = 0xc00000002
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] delete /workspace/super/testNode/fourNode
[zk: localhost:2181(CONNECTED) 5] create /workspace/super/testNode/fiveNode five-node-data
Created /workspace/super/testNode/fiveNode
[zk: localhost:2181(CONNECTED) 6] 

此時,控制臺輸出內容如下:

當前客戶端的狀態:連接中...
當前節點的子節點詳細數據列表:
    * 子節點路徑:/super/testNode/oneNode,該節點的數據為:one-node-data
    * 子節點路徑:/super/testNode/threeNode,該節點的數據為:three-node-data
    * 子節點路徑:/super/testNode/twoNode,該節點的數據為:two-node-data

--------------

子節點:/super/testNode/fourNode 添加成功,該子節點的數據為:four-node-data

--------------

子節點:/super/testNode/oneNode 數據更新成功,子節點:/super/testNode/oneNode 新的數據為:change-node-data

--------------

子節點:/super/testNode/fourNode 刪除成功

--------------

子節點:/super/testNode/fiveNode 添加成功,該子節點的數據為:five-node-data
當前客戶端的狀態:已關閉...

以上的演示例子中為了獲取子節點列表,所以我們的代碼使用的是同步初始化模式。如果使用異步初始化是獲取不到子節點列表的,例如修改childrenCache.start代碼如下:

childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

這種模式會在初始化時觸發子節點初始化以及添加子節點這兩個事件,將初始化模式修改成POST_INITIALIZED_EVENT後,運行該類,此時控制臺輸出信息如下:

當前客戶端的狀態:連接中...
當前節點的子節點詳細數據列表:

--------------

子節點:/super/testNode/threeNode 添加成功,該子節點的數據為:three-node-data

--------------

子節點:/super/testNode/twoNode 添加成功,該子節點的數據為:two-node-data

--------------

子節點:/super/testNode/fiveNode 添加成功,該子節點的數據為:five-node-data

--------------

子節點:/super/testNode/oneNode 添加成功,該子節點的數據為:change-node-data

--------------

子節點:/super/testNode/fourNode 添加成功,該子節點的數據為:four-node-data

--------------

子節點初始化成功
當前客戶端的狀態:已關閉...

從控制臺輸出信息中可以看到,在這種模式下,子節點列表並沒有被獲取出來。除此之外,會觸發添加子節點事件以及子節點初始化事件。因為緩存初始化時是把子節點添加到緩存裏,所以會觸發添加子節點事件,而添加完成之後,就會觸發子節點初始化完成事件。

我們再來看看另一種異步初始化的模式:NORMAL模式,在這種模式下,同樣的無法獲取子節點列表,並且也會觸發添加子節點事件,但是不會觸發子節點初始化完成事件。修改childrenCache.start代碼如下:

childrenCache.start(PathChildrenCache.StartMode.NORMAL);

將初始化模式修改成NORMAL後,運行該類,此時控制臺輸出信息如下:

當前客戶端的狀態:連接中...
當前節點的子節點詳細數據列表:

--------------

子節點:/super/testNode/threeNode 添加成功,該子節點的數據為:three-node-data

--------------

子節點:/super/testNode/twoNode 添加成功,該子節點的數據為:two-node-data

--------------

子節點:/super/testNode/fiveNode 添加成功,該子節點的數據為:five-node-data

--------------

子節點:/super/testNode/oneNode 添加成功,該子節點的數據為:change-node-data

--------------

子節點:/super/testNode/fourNode 添加成功,該子節點的數據為:four-node-data
當前客戶端的狀態:已關閉...

從控制臺輸出信息中可以看到,在這種模式下,子節點列表並沒有被獲取出來。除此之外,還會觸發添加子節點事件。通常使用異步初始化的情況下,都是使用POST_INITIALIZED_EVENT模式,NORMAL較為少用。

如果我們想要監聽某一個特定的節點,例如我們要監聽/super/testNode這個節點,那麽以上面的代碼作為例子,就需要把nodePath改為/super。然後在每一個判斷條件中,再加上一個子判斷條件,將節點限定為/super/testNode才會觸發,這樣就能實現監聽某一個節點的增刪改事件了。如下示例:

...
public class CuratorConnect {
    ...

    private static final String PARENT_NODE_PATH = "/super";  // 父節點
    private static final String NODE_PATH = "/super/testNode";  // 需要被特定監聽的節點

    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorConnect curatorConnect = new CuratorConnect();
        // 獲取當前客戶端的狀態
        boolean isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));

        // 為子節點添加watcher
        // PathChildrenCache: 監聽數據節點的增刪改,可以設置觸發的事件
        final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, PARENT_NODE_PATH, true);

        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 添加事件監聽器
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                // 通過判斷event type的方式來實現不同事件的觸發
                if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {  // 子節點初始化時觸發
                    System.out.println("\n--------------\n");
                    System.out.println("子節點初始化成功");
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {  // 添加子節點時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子節點:" + event.getData().getPath() + " 添加成功,");
                        System.out.println("該子節點的數據為:" + new String(event.getData().getData()));
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {  // 刪除子節點時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.println("子節點:" + event.getData().getPath() + " 刪除成功");
                    }
                } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {  // 修改子節點數據時觸發
                    if (event.getData().getPath().trim().equals(NODE_PATH)) {
                        System.out.println("\n--------------\n");
                        System.out.print("子節點:" + event.getData().getPath() + " 數據更新成功,");
                        System.out.println("子節點:" + event.getData().getPath() + " 新的數據為:" + new String(event.getData().getData()));
                    }
                }
            }
        });

        Thread.sleep(200000);

        // 關閉客戶端
        curatorConnect.closeZKClient();

        // 獲取當前客戶端的狀態
        isZkCuratorStarted = curatorConnect.client.isStarted();
        System.out.println("當前客戶端的狀態:" + (isZkCuratorStarted ? "連接中..." : "已關閉..."));
    }
}

這是最簡單粗暴的方式了,當然在實際開發中,肯定會寫得好一些,這個演示只是為了說明可以借助PathChildrenCache來實現某個特點節點的增刪改事件監聽。


zk-watcher應用實例之模擬統一更新N臺節點的配置文件

zookeeper有一個比較常見的應用場景就是統一管理、更新分布式集群環境中每個節點的配置文件,我們可以在代碼中監聽集群中的節點,當節點數據發生改變時就同步到其他節點上。如下圖:
技術分享圖片

因為我們使用的json作為節點存儲的數據格式,所以需要準備一個工具類來做json與pojo對象的一個轉換,也就是所謂的反序列化。創建一個 JsonUtils 類,代碼如下:

package org.zero01.zk.util;

import java.util.List;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * @Title: JsonUtils.java
 * @Package org.zero01.zk.util
 * @Description: JSON/對象轉換類
 */
public class JsonUtils {

    // 定義jackson對象
    private static final ObjectMapper MAPPER = new ObjectMapper();

    /**
     * 將對象轉換成json字符串。
     * <p>Title: pojoToJson</p>
     * <p>Description: </p>
     * @param data
     * @return
     */
    public static String objectToJson(Object data) {
        try {
            String string = MAPPER.writeValueAsString(data);
            return string;
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 將json結果集轉化為對象
     *
     * @param jsonData json數據
     * @param beanType 對象中的object類型
     * @return
     */
    public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
        try {
            T t = MAPPER.readValue(jsonData, beanType);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 將json數據轉換成pojo對象list
     * <p>Title: jsonToList</p>
     * <p>Description: </p>
     * @param jsonData
     * @param beanType
     * @return
     */
    public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
        JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
        try {
            List<T> list = MAPPER.readValue(jsonData, javaType);
            return list;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

創建一個pojo類,封裝json格式的數據:

package org.zero01.zk.util;

public class RedisConfig {

    private String type;    // add 新增配置 update 更新配置 delete 刪除配置
    private String url;        // 如果是add或update,則提供下載地址
    private String remark;    // 備註
    ... gtter stter 略 ...
}

然後創建客戶端類,客戶端類就是用來監聽集群中的節點的。由於是模擬,所以這裏的部分代碼是偽代碼。客戶端類我們這裏創建了三個,因為集群中有三個節點,由於代碼基本上是一樣的,所以這裏只貼出客戶端_1的代碼。如下:

package org.zero01.zk.checkconfig;

import java.util.concurrent.CountDownLatch;

import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

import org.zero01.zk.util.JsonUtils;
import org.zero01.zk.util.RedisConfig;

public class Client_1 {

    public CuratorFramework client = null;
    public static final String zkServerIp = "192.168.190.128:2181";

    // 初始化重連策略以及客戶端對象並啟動
    public Client_1() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerIp)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    // 關閉客戶端
    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    //  public final static String CONFIG_NODE = "/super/testNode/redis-config";
    public final static String CONFIG_NODE_PATH = "/super/testNode";
    public final static String SUB_PATH = "/redis-config";
    public static CountDownLatch countDown = new CountDownLatch(1);  // 計數器

    public static void main(String[] args) throws Exception {
        Client_1 cto = new Client_1();
        System.out.println("client1 啟動成功...");

        // 開啟子節點緩存
        final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
        childrenCache.start(StartMode.BUILD_INITIAL_CACHE);

        // 添加子節點監聽事件
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                // 監聽節點的數據更新事件
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                    String configNodePath = event.getData().getPath();
                    if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
                        System.out.println("監聽到配置發生變化,節點路徑為:" + configNodePath);

                        // 讀取節點數據
                        String jsonConfig = new String(event.getData().getData());
                        System.out.println("節點" + CONFIG_NODE_PATH + "的數據為: " + jsonConfig);

                        // 從json轉換配置
                        RedisConfig redisConfig = null;
                        if (StringUtils.isNotBlank(jsonConfig)) {
                            redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
                        }

                        // 配置不為空則進行相應操作
                        if (redisConfig != null) {
                            String type = redisConfig.getType();
                            String url = redisConfig.getUrl();
                            String remark = redisConfig.getRemark();
                            // 判斷事件
                            if (type.equals("add")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到新增的配置,準備下載...");
                                // ... 連接ftp服務器,根據url找到相應的配置
                                Thread.sleep(500);
                                System.out.println("開始下載新的配置文件,下載路徑為<" + url + ">");
                                // ... 下載配置到你指定的目錄
                                Thread.sleep(1000);
                                System.out.println("下載成功,已經添加到項目中");
                                // ... 拷貝文件到項目目錄
                            } else if (type.equals("update")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到更新的配置,準備下載...");
                                // ... 連接ftp服務器,根據url找到相應的配置
                                Thread.sleep(500);
                                System.out.println("開始下載配置文件,下載路徑為<" + url + ">");
                                // ... 下載配置到你指定的目錄
                                Thread.sleep(1000);
                                System.out.println("下載成功...");
                                System.out.println("刪除項目中原配置文件...");
                                Thread.sleep(100);
                                // ... 刪除原文件
                                System.out.println("拷貝配置文件到項目目錄...");
                                // ... 拷貝文件到項目目錄
                            } else if (type.equals("delete")) {
                                System.out.println("\n-------------------\n");
                                System.out.println("監聽到需要刪除配置");
                                System.out.println("刪除項目中原配置文件...");
                            }
                            // TODO 視情況統一重啟服務
                        }
                    }
                }
            }
        });

        countDown.await();

        cto.closeZKClient();
    }
}

完成以上代碼的編寫後,將所有的客戶類都運行起來。然後到zookeeper服務器上,進行如下操作:

[zk: localhost:2181(CONNECTED) 14] set /workspace/super/testNode/redis-config {"type":"add","url":"ftp://192.168.10.123/config/redis.xml","remark":"add"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000043
mtime = Mon Apr 30 01:52:35 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 75
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] set /workspace/super/testNode/redis-config {"type":"update","url":"ftp://192.168.10.123/config/redis.xml","remark":"update"}
cZxid = 0xc00000039
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000044
mtime = Mon Apr 30 01:53:46 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 81
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] set /workspace/super/testNode/redis-config {"type":"delete","url":"","remark":"delete"}   
cZxid = 0xc00000039               
ctime = Mon Apr 30 01:43:47 CST 2018
mZxid = 0xc00000045
mtime = Mon Apr 30 01:54:06 CST 2018
pZxid = 0xc00000039
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 44
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

此時,三個客戶端的控制臺輸出信息如下:
技術分享圖片
技術分享圖片
技術分享圖片

如上,從三個客戶端的控制臺輸出信息可以看到,三個節點都進行了同樣操作,觸發了同樣的watch事件,這樣就可以完成統一的配置文件管理。


curator之acl權限操作與認證授權

以上我們介紹了curator對節點進行增刪查改以及註冊watch事件的操作,最後我們來演示一下,使用curator如何對節點的acl權限進行操作以及與zk服務端建立連接登錄用戶實現認證授權。

我們先演示在創建節點時設置acl權限,現在/workspace/super只有如下節點:

[zk: localhost:2181(CONNECTED) 27] ls /workspace/super
[xxxnode, testNode]
[zk: localhost:2181(CONNECTED) 28]

然後新建一個 CuratorAcl 類,關於acl權限的概念以及部分API代碼都在之前的zk原生API使用一文中介紹過了,所以這裏就不贅述了。編寫代碼如下:

package org.zero01.zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.zero01.zk.util.AclUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * @program: zookeeper-connection
 * @description: curator操作zk節點acl權限演示demo
 * @author: 01
 * @create: 2018-04-29 19:53
 **/
public class CuratorAcl {

    // Curator客戶端
    public CuratorFramework client = null;
    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";

    public CuratorAcl() {
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder().authorization("digest", "user1:123456a".getBytes())  // 認證授權,登錄用戶
                .connectString(zkServerIps)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        client.start();
    }

    public void closeZKClient() {
        if (client != null) {
            this.client.close();
        }
    }

    public static void main(String[] args) throws Exception {

        // 實例化
        CuratorAcl cto = new CuratorAcl();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連接中" : "已關閉"));

        String nodePath = "/super/testAclNode/testOne";

        // 自定義權限列表
        List<ACL> acls = new ArrayList<ACL>();
        Id user1 = new Id("digest", AclUtils.getDigestUserPwd("user1:123456a"));
        Id user2 = new Id("digest", AclUtils.getDigestUserPwd("user2:123456b"));
        acls.add(new ACL(ZooDefs.Perms.ALL, user1));
        acls.add(new ACL(ZooDefs.Perms.READ, user2));
        acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2));

        // 創建節點,使用自定義權限列表來設置節點的acl權限
        byte[] nodeData = "child-data".getBytes();
        cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(nodePath, nodeData);

        cto.closeZKClient();
        boolean isZkCuratorStarted2 = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連接中" : "已關閉"));
    }
}

運行該類,然後到zookeeper服務器上,通過命令行進行如下操作:

[zk: localhost:2181(CONNECTED) 19] ls /workspace/super/testAclNode    
[testOne]
[zk: localhost:2181(CONNECTED) 20] getAcl /workspace/super/testAclNode
‘world,‘anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 21] getAcl /workspace/super/testAclNode/testOne
‘digest,‘user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
: cdrwa
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: r
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: cd
[zk: localhost:2181(CONNECTED) 22] 

可以看到,當遞歸創建節點時,只會對最末端的子節點賦予自定義的acl權限,父節點都是zk默認的匿名權限。

如果想要在遞歸創建節點時,父節點和子節點的acl權限都是我們自定義的權限,那麽就需要在withACL方法中,傳遞一個true,表示遞歸創建時所有節點的權限,都是我們設置的權限。修改代碼如下:

// 創建節點,使用自定義權限列表來設置節點的acl權限
byte[] nodeData = "child-data".getBytes();
cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true).forPath(nodePath, nodeData);

運行該類,然後到zookeeper服務器上,通過命令行進行如下操作:

[zk: localhost:2181(CONNECTED) 28] getAcl /workspace/super/testAclNodeTwo
‘digest,‘user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
: cdrwa
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: r
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: cd
[zk: localhost:2181(CONNECTED) 30] getAcl /workspace/super/testAclNodeTwo/testOne
‘digest,‘user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
: cdrwa
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: r
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: cd
[zk: localhost:2181(CONNECTED) 31] 

如上,可以看到,創建的全部節點的acl權限都是我們設置的自定義權限。

最後我們再來演示如何修改一個已存在的節點的acl權限,修改 CuratorAcl 類中的main方法代碼如下:

    public static void main(String[] args) throws Exception {
        // 實例化
        CuratorAcl cto = new CuratorAcl();
        boolean isZkCuratorStarted = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連接中" : "已關閉"));

        String nodePath = "/super/testAclNodeTwo/testOne";

        // 自定義權限列表
        List<ACL> acls = new ArrayList<ACL>();
        Id user1 = new Id("digest", AclUtils.getDigestUserPwd("user1:123456a"));
        Id user2 = new Id("digest", AclUtils.getDigestUserPwd("user2:123456b"));
        acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN, user1));
        acls.add(new ACL(ZooDefs.Perms.READ | ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2));

        // 設置指定節點的acl權限
        cto.client.setACL().withACL(acls).forPath(nodePath);

        cto.closeZKClient();
        boolean isZkCuratorStarted2 = cto.client.isStarted();
        System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連接中" : "已關閉"));
    }

運行該類,然後到zookeeper服務器上,通過命令行進行如下操作:

[zk: localhost:2181(CONNECTED) 31] getAcl /workspace/super/testAclNodeTwo/testOne
‘digest,‘user1:TQYTqd46qVVbWpOd02tLO5qb+JM=
: cra
‘digest,‘user2:CV4ED0rE6SxA3h/DN/WyScDMbCs=
: cdr
[zk: localhost:2181(CONNECTED) 32] 

可以看到,成功修改了該節點的acl權限。

Apache Curator操作zookeeper的API使用