1. 程式人生 > >使用ZooKeeper提供的Java API操作ZooKeeper

使用ZooKeeper提供的Java API操作ZooKeeper

zookeeper 服務協調框架 分布式 集群 Java API

建立客戶端與zk服務端的連接

我們先來創建一個普通的maven工程,然後在pom.xml文件中配置zookeeper依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.11</version>
    </dependency>
</dependencies>

在resources目錄下創建一個zk-connect.properties屬性配置文件,我們在該文件中填寫連接zookeeper服務器的一些配置信息。如下:

# zk.zkServerIp=192.168.190.129:2181  單機模式
zk.zkServerIps=192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181
zk.timeout=5000

註:我這裏使用的集群模式,所以是多個IP。

zookeeper使用的是log4j作為日誌打印工具,所以我們還需要在resources目錄下創建log4j的

log4j.rootLogger=WARN,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.encoding=UTF-8
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n

然後創建一個連接類demo,代碼如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Properties;

/**
 * @Description: zookeeper 連接demo演示
 */
public class ZKConnect implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKConnect.class);

    // private static String zkServerIp;  單機模式是一個ip

    // 集群模式則是多個ip
    private static String zkServerIps;

    // 連接超時時間
    private static Integer timeout;

    // 加載配置信息
    static {
        Properties properties = new Properties();
        InputStream inputStream = Object.class.getResourceAsStream("/zk-connect.properties");
        try {
            properties.load(inputStream);

            // zkServerIp = properties.getProperty("zk.zkServerIp");
            zkServerIps = properties.getProperty("zk.zkServerIps");
            timeout = Integer.parseInt(properties.getProperty("zk.timeout"));
        } catch (Exception e) {
            logger.error("配置文件讀取異常", e);
        } finally {
            try {
                if (inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                logger.error("關閉流失敗", e);
            }
        }
    }

    // Watch事件通知
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        /**
         * 客戶端和zk服務端鏈接是一個異步的過程
         * 當連接成功後後,客戶端會收的一個watch通知
         *
         * 參數:
         * connectString:連接服務器的ip字符串,
         *      比如: "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181"
         *      可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表集群
         *      也可以在ip後加路徑
         * sessionTimeout:超時時間,心跳收不到了,那就超時
         * watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設置為null
         * canBeReadOnly:可讀,當這個物理機節點斷開後,還是可以讀到數據的,只是不能寫,
         *                         此時數據被讀取到的可能是舊數據,此處建議設置為false,不推薦使用
         * sessionId:會話的id
         * sessionPasswd:會話密碼   當會話丟失後,可以依據 sessionId 和 sessionPasswd 重新獲取會話
         */

        // 實例化zookeeper客戶端
        ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnect());

        logger.warn("客戶端開始連接zookeeper服務器...");
        logger.warn("連接狀態:{}", zooKeeper.getState());

        // 避免發出連接請求就斷開,不然就無法正常連接也無法獲取watch事件的通知
        Thread.sleep(2000);

        logger.warn("連接狀態:{}", zooKeeper.getState());
    }
}

運行該類後,控制臺輸出日誌信息如下:

2018-04-25 10:41:32,488 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:76)] - [WARN] 客戶端開始連接zookeeper服務器...
2018-04-25 10:41:32,505 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:77)] - [WARN] 連接狀態:CONNECTING
2018-04-25 10:41:32,515 [main-EventThread] [org.zero01.zk.demo.ZKConnect.process(ZKConnect.java:52)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 10:41:34,507 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:81)] - [WARN] 連接狀態:CONNECTED

這樣,我們就完成了一個與zookeeper服務端建立連接的過程。


zk會話重連機制

上一節我們簡單演示了如何去連接zk服務端,本節則介紹一下,如何通過sessionid和session密碼去恢復上一次的會話,也就是zk的會話重連機制。

新建一個類,用做於演示zk會話重連機制的demo:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @program: zookeeper-connection
 * @description: zookeeper 恢復之前的會話連接demo演示
 * @author: 01
 * @create: 2018-04-25 12:59
 **/
public class ZKConnectSessionWatcher implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;

    // Watch事件通知
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        // 實例化zookeeper客戶端
        ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher());

        logger.warn("客戶端開始連接zookeeper服務器...");
        logger.warn("連接狀態:{}", zooKeeper.getState());
        Thread.sleep(2000);
        logger.warn("連接狀態:{}", zooKeeper.getState());

        // 記錄本次會話的sessionId
        long sessionId = zooKeeper.getSessionId();
        // 轉換成16進制進行打印
        logger.warn("sid:{}", "0x" + Long.toHexString(sessionId));
        // 記錄本次會話的session密碼
        byte[] sessionPassword = zooKeeper.getSessionPasswd();

        Thread.sleep(200);

        // 開始會話重連
        logger.warn("開始會話重連...");
        // 加上sessionId和password參數去實例化zookeeper客戶端
        ZooKeeper zkSession = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
        logger.warn("重新連接狀態zkSession:{}", zkSession.getState());
        Thread.sleep(2000);
        logger.warn("重新連接狀態zkSession:{}", zkSession.getState());
    }
}

運行該類,控制臺輸出日誌結果如下:

2018-04-25 13:48:00,931 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:35)] - [WARN] 客戶端開始連接zookeeper服務器...
2018-04-25 13:48:00,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:36)] - [WARN] 連接狀態:CONNECTING
2018-04-25 13:48:00,951 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 連接狀態:CONNECTED
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:43)] - [WARN] sid:0x10000e81cfa0002
2018-04-25 13:48:03,136 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:50)] - [WARN] 開始會話重連...
2018-04-25 13:48:03,137 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:53)] - [WARN] 重新連接狀態zkSession:CONNECTING
2018-04-25 13:48:03,142 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:05,140 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:55)] - [WARN] 重新連接狀態zkSession:CONNECTED

同步/異步創建zk節點

以上我們介紹了如何去連接和重連zk服務端,既然知道如何連接zk服務端之後,我們來看一下如何,同步或異步去創建zk節點。

先演示同步創建zk節點的方式,創建一個demo類如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
 * @program: zookeeper-connection
 * @description:  演示同步異步創建zk節點
 * @author: 01
 * @create: 2018-04-25 13:51
 **/
public class ZkNodeOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZkNodeOperator.class);

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private ZooKeeper zooKeeper;

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public ZkNodeOperator() {
    }

    public ZkNodeOperator(String connectStr) {
        try {
            // 在使用該構造器的時候,實例化zk客戶端對象
            zooKeeper = new ZooKeeper(connectStr, timeout, new ZkNodeOperator());
        } catch (IOException e) {
            e.printStackTrace();
            try {
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    /**
     * @Title: ZKOperatorDemo.java
     * @Description: 創建zk節點
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls) {
        String result = "";
        try {
            /**
             * 同步或者異步創建節點,都不支持子節點的遞歸創建,異步有一個callback函數
             * 參數:
             * path:節點創建的路徑
             * data:節點所存儲的數據的byte[]
             * acl:控制權限策略
             *          Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *          CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode:節點類型, 是一個枚舉
             *          PERSISTENT:持久節點
             *          PERSISTENT_SEQUENTIAL:持久順序節點
             *          EPHEMERAL:臨時節點
             *          EPHEMERAL_SEQUENTIAL:臨時順序節點
             */
            // 同步創建zk節點,節點類型為臨時節點
            result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
            System.out.println("創建節點:\t" + result + "\t成功...");
            Thread.sleep(2000);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ZkNodeOperator zkServer = new ZkNodeOperator(zkServerIps);

        // 創建zk節點
        zkServer.createZKNode("/testNode", "testNode-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }
}

運行該類,到服務器上查看是否已創建成功。如下,我這裏是創建成功的:

[root@zk001 ~]# zkCli.sh
[zk: localhost:2181(CONNECTED) 7] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, data, real-culster]  # 因為是臨時節點,所以客戶端斷開之後就消失了
[zk: localhost:2181(CONNECTED) 9] quit
[root@zk001 ~]# 

控制臺輸出的日誌信息如下:

2018-04-25 14:16:47,726 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
創建節點:   /testNode   成功...

接下來我們演示一下異步創建zk節點的方式,因為異步創建有一個回調函數,所以我們得先創建一個類,實現StringCallback接口裏面的回調方法:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.StringCallback;

public class CreateCallBack implements StringCallback {

    // 回調函數
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("創建節點:" + path);
        System.out.println((String) ctx);
    }
}

修改 ZkNodeOperator 類中的 createZKNode 方法代碼如下:

...
public class ZkNodeOperator implements Watcher {
    ...
    /**
     * @Title: ZKOperatorDemo.java
     * @Description: 創建zk節點
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls) {
        try {
            ...
            // 異步步創建zk節點,節點類型為持久節點
            String ctx = "{‘create‘:‘success‘}";
            zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);

            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    

運行該類,然後到服務器上查看是否已創建成功。如下,我這裏是創建成功的:

[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 10] get /testNode
testNode-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000014
mtime = Wed Apr 25 22:17:26 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
[zk: localhost:2181(CONNECTED) 11] 

控制臺輸出的日誌信息如下:

2018-04-25 14:25:14,923 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
創建節點:/testNode
{‘create‘:‘success‘}

同步/異步修改zk節點數據

同樣的,我們也可以通過Zookeeper提供的Java API去修改zk節點的數據,也是有同步和異步兩種方式,先來演示同步的方式。創建一個新的類,代碼如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @program: zookeeper-connection
 * @description: 修改zk節點數據演示
 * @author: 01
 * @create: 2018-04-25 16:25
 **/
public class ZKNodeAlterOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKNodeAlterOperator.class);

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private ZooKeeper zooKeeper;

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public ZKNodeAlterOperator() {
    }

    public ZKNodeAlterOperator(String connectStr) {
        try {
            // 在使用該構造器的時候,實例化zk客戶端對象
            zooKeeper = new ZooKeeper(connectStr, timeout, new ZKNodeAlterOperator());
        } catch (IOException e) {
            e.printStackTrace();
            try {
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);

        /**
         * 修改zk節點數據(同步)
         * 參數:
         * path:節點路徑
         * data:新數據
         * version 數據版本
         */
        Stat status = zkServer.getZooKeeper().setData("/testNode", "this is new data".getBytes(), 0);
        // 通過Stat對象可以獲取znode所有的狀態屬性,這裏以version為例
        System.out.println("修改成功,當前數據版本為:" + status.getVersion());
    }
}

運行該類,到服務器上查看節點是否已成功修改數據。如下,我這裏是修改成功的:

[zk: localhost:2181(CONNECTED) 12] get /testNode
this is new data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000017
mtime = Thu Apr 26 00:21:54 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 13] 

控制臺輸出的日誌信息如下:

2018-04-25 16:30:02,111 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:57)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改成功,當前數據版本為:1

接下來演示一下異步修改zk節點數據的方式,和異步創建節點是幾乎一樣的。也是需要新建一個類來實現回調接口的方法,只不過接口不一樣而已。如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;

public class AlterCallBack implements StatCallback {

    // 回調函數
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("修改節點:" + path + "成功...");
        // 通過Stat對象可以獲取znode所有的狀態屬性,這裏以version為例
        System.out.println("當前數據版本為:" + stat.getVersion());
        System.out.println((String) ctx);
    }
}

然後修改 ZKNodeAlterOperator 類中的main方法代碼如下:

...
public class ZKNodeAlterOperator implements Watcher {
    ...
    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);

        /**
         * 修改zk節點數據(異步)
         * 參數:
         * path:節點路徑
         * data:新數據
         * version: 數據版本
         * sc:實現回調函數的對象
         * ctx:給回調函數的上下文
         */
        String ctx = "{‘alter‘:‘success‘}";
        zkServer.getZooKeeper().setData("/testNode", "asynchronous-data".getBytes(), 0, new AlterCallBack(), ctx);

        Thread.sleep(2000);
    }
}

運行該類,到服務器上查看節點是否已成功修改數據。如下,我這裏是修改成功的:

[zk: localhost:2181(CONNECTED) 16] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x70000001a
mtime = Thu Apr 26 00:35:53 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

控制臺輸出的日誌信息如下:

2018-04-25 16:44:03,472 [main-EventThread] [org.zero01.zk.demo.ZKNodeAlterOperator.process(ZKNodeAlterOperator.java:58)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改節點:/testNode成功...
當前數據版本為:2
{‘alter‘:‘success‘}

同步/異步刪除zk節點

同樣的,刪除節點也有同步和異步兩種方式,在刪除節點操作上,使用異步會更人性化一些,因為有回調通知,同步的方式,除了設置了watch事件,不然是沒有通知的。我們先來看一下同步方式的刪除節點,代碼如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKNodeDeleteOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKNodeDeleteOperator.class);

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
        // 創建節點
        zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
        /**
         * 刪除節點(同步)
         * 參數:
         * path:需要刪除的節點路徑
         * version:數據版本
         */
        zooKeeper.delete("/testDeleteNode", 0);

        zooKeeper.close();
    }
}

由於同步的刪除方法不會有返回值,所以我們無法在控制臺輸出內容。

然後再來看一下異步方式的刪除節點,首先需要新建一個類實現回調接口的方法:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.VoidCallback;

public class DeleteCallBack implements VoidCallback {

    // 回調函數
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("刪除節點:" + path + " 成功...");
        System.out.println((String) ctx);
    }
}

然後修改一下 ZKNodeDeleteOperator 類的main方法:

public class ZKNodeDeleteOperator implements Watcher {
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
        // 創建節點
        zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
        /**
         * 刪除節點(異步)
         * 參數:
         * path:需要刪除的節點路徑
         * version:數據版本
         * sc:實現回調函數的對象
         * ctx:給回調函數的上下文
         */
        String ctx = "{‘delete‘:‘success‘}";
        zooKeeper.delete("/testDeleteNode", 0, new DeleteCallBack(), ctx);
        Thread.sleep(2000);
        zooKeeper.close();
    }
}

運行該類,控制臺輸出結果如下:

刪除節點:/testDeleteNode 成功...
{‘delete‘:‘success‘}

獲取zk節點數據

以上小節介紹完了增刪改,現在就剩下查了。同樣的查詢也有同步和異步兩種方式,異步的方式在之前的增刪改例子中已經都介紹過了,在查詢裏使用異步也是和增刪改同樣的方式,所以就不再演示查詢的異步了。zk中有三種數據可以查詢:查詢zk節點數據、查詢zk子節點列表、查詢某個zk節點是否存在。本節先介紹如何查詢zk節點數據。

現在zookeeper服務器上,有一個/testNode節點。節點數據內容如下:

[zk: localhost:2181(CONNECTED) 3] get /testNode
asynchronous-data
...
[zk: localhost:2181(CONNECTED) 4] 

然後我們來編寫一個 ZKGetNodeData 類,調用zookeeper的API去獲取zk節點數據。代碼示例:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @program: zookeeper-connection
 * @description: 獲取zk節點數據demo
 * @author: 01
 * @create: 2018-04-26 18:05
 **/
public class ZKGetNodeData implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKGetNodeData.class);

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;
    private static Stat stat = new Stat();

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());

        /**
         * 參數:
         * path:節點路徑
         * watch:true或者false,註冊一個watch事件
         * stat:狀態,我們可以通過這個對象獲取節點的狀態信息
         */
        byte[] resByte = zooKeeper.getData("/testNode", true, stat);
        String result = new String(resByte);
        System.out.println("/testNode 節點的數據: " + result);

        zooKeeper.close();
    }
}

控制臺輸出結果如下:

/testNode 節點的值: asynchronous-data

通過實現 Watcher 接口的通知方法,再結合這個獲取節點數據的API,我們就可以在數據發生改變的時候獲取最新的數據。如下示例,在 ZKGetNodeData 類中,增加代碼如下:

...
public class ZKGetNodeData implements Watcher {
    ...
    // 計數器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        try {
            if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
                byte[] resByte = zooKeeper.getData("/testNode", false, stat);
                String result = new String(resByte);
                System.out.println("/testNode 節點的數據發生了變化");
                System.out.println("新的數據為: " + result);
                System.out.println("新的數據版本號為:" + stat.getVersion());

                // 通知完之後,計數器減一
                countDownLatch.countDown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ...
        // 等待線程執行
        countDownLatch.await();
    }
}

這時候由於我們在main裏調用了await()方法,所以主線程會阻塞。然後我們到zookeeper服務器上,對該節點的數據進行操作,如下:

[zk: localhost:2181(CONNECTED) 11] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000011
mtime = Fri Apr 27 03:04:09 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] set /testNode new-data 6       
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000013
mtime = Fri Apr 27 03:04:35 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]

當我們修改了數據之後,控制臺就會輸出如下內容,主線程就會解除阻塞結束執行:

/testNode 節點的數據: asynchronous-data
/testNode 節點的數據發生了變化
新的數據為: new-data
新的數據版本號為:7

獲取zk子節點列表

本節介紹一下如何獲取zk子節點列表,同樣的也是有同步和異步兩種方式,這裏介紹的是同步的。testNode節點下有三個節點,如下:

[zk: localhost:2181(CONNECTED) 20] ls /testNode
[ThreeNode, TwoNode, OneNode]
[zk: localhost:2181(CONNECTED) 21] 

我們來寫一個demo獲取這個節點下的子節點列表。代碼如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.List;

/**
 * @program: zookeeper-connection
 * @description:  zookeeper 獲取子節點數據的demo演示
 * @author: 01
 * @create: 2018-04-26 21:13
 **/
public class ZKGetChildrenList implements Watcher{

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetChildrenList());

        /**
         * 參數:
         * path:父節點路徑
         * watch:true或者false,註冊一個watch事件
         */
        List<String> strChildList = zooKeeper.getChildren("/testNode", false);
        for (String s : strChildList) {
            System.out.println(s);
        }
    }
}

控制臺就會輸出內容如下:

ThreeNode
TwoNode
OneNode

判斷zk節點是否存在

最後介紹如何判斷一個zk節點是否存在,同樣的也是有同步和異步兩種方式,這裏介紹的是同步的。我們來寫一個demo判斷某個zk節點是否存在。代碼如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * @program: zookeeper-connection
 * @description: zookeeper 判斷節點是否存在demo
 * @author: 01
 * @create: 2018-04-26 22:06
 **/
public class ZKNodeExist implements Watcher {

    // 集群模式則是多個ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超時時間
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    public void process(WatchedEvent watchedEvent) {
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeExist());

        /**
         * 參數:
         * path:節點路徑
         * watch:true或者false,註冊一個watch事件
         */
        Stat stat = zooKeeper.exists("/testNode", true);
        if (stat != null) {
            System.out.println("testNode 節點存在...");
            System.out.println("該節點的數據版本為:" + stat.getVersion());
        } else {
            System.out.println("該節點不存在...");
        }
    }
}

運行該類,控制臺輸出如下:

testNode 節點存在...
該節點的數據版本為:7

將testNode換成一個不存在的節點,運行該類,控制臺輸出如下:

該節點不存在...

使用ZooKeeper提供的Java API操作ZooKeeper