1. 程式人生 > 其它 >ZooKeeper Java客戶端

ZooKeeper Java客戶端

ZooKeeper Java客戶端。

這裡介紹兩個客戶端,一個為 ZooKeeper 原生客戶端,一個為 ZkClient。

首先建立一個maven專案,pom檔案中新增依賴。

<!-- 原生 -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
</dependency>
<!-- zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

一、ZooKeeper原生客戶端

之前安裝過程看到 ZooKeeper 依賴 jdk 環境,可以看出它的原始碼使用了Java語言基礎,學習原生客戶端對於以後看原始碼有幫助,接下來看一看使用方式。

1. 建立會話

/**
 * @Author SunnyBear
 * @Description 建立Session
 */
public class TestCreateSession {

    /**
     * ZooKeeper服務地址
     */
    private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    /**
     * 會話超時時間
     */
    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) throws IOException, InterruptedException {
        new TestCreateSession().testSession1();
        System.out.println("--------------------------華麗的分割線--------------------------");
        new TestCreateSession().testSession2();
    }

    /**
     * 獲得session的方式,這種方式可能會在ZooKeeper還沒有獲得連線的時候就已經對ZK進行訪問了
     * 測試可以發現連線狀態為CONNECTING,而不是CONNECTED
     */
    public void testSession1() throws IOException {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null);
        System.out.println("zooKeeper: " + zooKeeper);
        System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
    }

    /**
     * 發令槍
     */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 使用發令槍對獲得Session的方式進行優化,在ZooKeeper初始化完成以前先等待,等待完成後再進行後續操作
     */
    public void testSession2() throws IOException, InterruptedException {
        ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    // 狀態為已連線時才進行後續操作
                    connectedSemaphore.countDown();
                    System.out.println("狀態為已連線。。");
                }
            }
        });
        connectedSemaphore.await();
        System.out.println("zooKeeper: " + zooKeeper);
        System.out.println("zooKeeper.getState(): " + zooKeeper.getState());
    }
}

2. 基本操作

/**
 * @Author SunnyBear
 * @Description 基本操作
 */
public class TestJavaApi implements Watcher {

    private static final int SESSION_TIMEOUT = 10000;
    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
    private static final String ZK_PATH = "/SunnyBearApiTest";
    private static final String ZK_DATA = "我是SunnyBearApiTest的資料";
    private ZooKeeper zk = null;

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    /**
     * 建立連線
     * @param server zk伺服器地址列表
     * @param sessionTimeout session超時時間
     */
    public void createConnection(String server, int sessionTimeout) {
        try {
            zk = new ZooKeeper(server, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 收到來自Server的Watcher通知,然後呼叫的方法處理
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("收到事件通知:" + watchedEvent);
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            connectedSemaphore.countDown();
        }
    }

    /**
     * 關閉連線
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 建立節點
     * @param path 節點path
     * @param data 初始資料內容
     * @return 是否建立成功
     */
    public boolean createPath(String path, String data) {
        try {
            /**
             * ZooDefs.Ids.OPEN_ACL_UNSAFE:節點許可權
             * CreateMode.EPHEMERAL:節點型別為臨時節點
             */
            String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("建立節點path:" + createPath);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 讀取指定節點資料
     * @param path 節點path
     * @return 節點內容
     */
    public String readData(String path) {
        try {
            String data = new String(this.zk.getData(path, false, null));
            System.out.println("讀取資料成功path:" + path + ",data:" + data);
            return data;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 更新指定節點資料
     * @param path 節點path
     * @param data 資料data
     * @return 是否成功
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新資料成功,path:" + path + ", stat: "
                    + this.zk.setData(path, data.getBytes(), -1));
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 刪除節點
     * @param path 節點path
     * @return 是否成功
     */
    public boolean deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("刪除節點成功,path:" + path);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public static void main(String[] args) {
        TestJavaApi testJavaApi = new TestJavaApi();
        testJavaApi.createConnection(SERVER, SESSION_TIMEOUT);
        if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) {
            System.out.println("建立後資料內容:" + testJavaApi.readData(ZK_PATH));
            testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改後的資料");
            System.out.println("更新後資料內容:" + testJavaApi.readData(ZK_PATH));
            testJavaApi.deleteNode(ZK_PATH);
        }
        testJavaApi.releaseConnection();
    }
}

3. 監聽機制

Zookeeper採用了Watcher機制實現資料的釋出/訂閱功能。該機制在被訂閱物件發生變化時會非同步通知客戶端,因此客戶端不必在Watcher註冊後輪詢阻塞,從而減輕了客戶端壓力,Watcher是一次性的,如果被觸發了需要重新註冊。

/**
 * @Author SunnyBear
 * @Description 監聽機制
 */
public class TestWatcher implements Watcher {

    private AtomicInteger seq = new AtomicInteger();
    private static final int SESSION_TIMEOUT = 100000;
    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
    private static final String PARENT_PATH = "/testWatch";
    private static final String CHILDREN_PATH = "/testWatch/children";
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private ZooKeeper zk = null;

    /**
     * 建立連線
     * @param server zk伺服器地址列表
     * @param sessionTimeout session超時時間
     */
    public void createConnection(String server, int sessionTimeout) {
        try {
            zk = new ZooKeeper(server, sessionTimeout, this);
            connectedSemaphore.await();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉連線
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 建立節點
     * @param path 節點path
     * @param data 初始資料內容
     * @return 是否建立成功
     */
    public boolean createPath(String path, String data) {
        try {
            // 設定監控(由於zookeeper的監控都是一次性的所以 每次必須設定監控)
            this.zk.exists(path, true);
            // 建立持久節點
            String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("建立節點path:" + createPath);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 讀取指定節點資料
     * @param path 節點path
     * @param needWatch  表示是否需要註冊一個watcher。true:註冊預設watcher,false:不需要註冊watcher
     * @return 節點內容
     */
    public String readData(String path, boolean needWatch) {
        try {
            String data = new String(this.zk.getData(path, needWatch, null));
            System.out.println("讀取資料成功path:" + path + ",data:" + data);
            return data;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 更新指定節點資料
     * @param path 節點path
     * @param data 資料data
     * @return 是否成功
     */
    public boolean writeData(String path, String data) {
        try {
            System.out.println("更新資料成功,path:" + path + ", stat: "
                    + this.zk.setData(path, data.getBytes(), -1));
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 刪除節點
     * @param path 節點path
     * @return 是否成功
     */
    public boolean deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println("刪除節點成功,path:" + path);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 判斷指定節點是否存在
     * @param path 節點路徑
     * @param needWatch 表示是否需要註冊一個watcher
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 獲取子節點
     * @param path 節點路徑
     * @param needWatch 表示是否需要註冊一個watcher
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 刪除測試用的所有節點
     */
    public void deleteAllTestPath() {
        if(this.exists(CHILDREN_PATH, false) != null){
            this.deleteNode(CHILDREN_PATH);
        }
        if(this.exists(PARENT_PATH, false) != null){
            this.deleteNode(PARENT_PATH);
        }
    }

    /**
     * 收到來自Server的Watcher通知,然後呼叫的方法處理
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("收到事件通知:" + watchedEvent);
        try {
            Thread.sleep(200);
            if (watchedEvent == null) {
                return;
            }
            // 連線狀態
            Event.KeeperState eventState = watchedEvent.getState();
            // 事件型別
            Event.EventType eventType = watchedEvent.getType();
            // 受影響的path
            String eventPath = watchedEvent.getPath();
            // 列印一下監聽的資訊
            String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] ";
            System.out.println(eventPrefix + "收到Watcher通知");
            System.out.println(eventPrefix + "連線狀態:\t" + eventState.toString());
            System.out.println(eventPrefix + "事件型別:\t" + eventType.toString());

            if (Event.KeeperState.SyncConnected == eventState) {
                switch (eventType) {
                    case None:
                        System.out.println(eventPrefix + "成功連線zk伺服器");
                        connectedSemaphore.countDown();
                        break;
                    case NodeCreated:
                        System.out.println(eventPrefix + "節點建立");
                        Thread.sleep(200);
                        this.zk.exists(eventPath, true);
                        break;
                    case NodeDataChanged:
                        System.out.println(eventPrefix + "節點資料更新");
                        Thread.sleep(200);
                        System.out.println(eventPrefix + "資料內容: " + this.readData(PARENT_PATH, true));
                        break;
                    case NodeChildrenChanged:
                        System.out.println(eventPrefix + "子節點變更");
                        Thread.sleep(3000);
                        System.out.println(eventPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
                        break;
                    case NodeDeleted:
                        System.out.println(eventPrefix + "節點 " + eventPath + " 被刪除");
                        break;
                    default:
                        break;
                }
            } else if (Watcher.Event.KeeperState.Disconnected == eventState) {
                System.out.println(eventPrefix + "與ZK伺服器斷開連線");
            } else if (Watcher.Event.KeeperState.AuthFailed == eventState) {
                System.out.println(eventPrefix + "許可權檢查失敗");
            } else if (Watcher.Event.KeeperState.Expired == eventState) {
                System.out.println(eventPrefix + "會話失效");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("-------------------------------------------------------------");
    }

    public static void main(String[] args) throws InterruptedException {
        TestWatcher zkWatch = new TestWatcher();
        // 建立連線
        zkWatch.createConnection(SERVER, SESSION_TIMEOUT);

        Thread.sleep(1000);

        // 刪除所有節點
        zkWatch.deleteAllTestPath();

        if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
            /**
             * 讀取資料,在操作節點資料之前先呼叫zookeeper的getData()方法是為了可以watch到對節點的操作。watch是一次性的
             * 也就是說,如果第二次又重新呼叫了setData()方法,在此之前需要重新呼叫一次
             */
            System.out.println("------------------------讀取PARENT------------------------");
            zkWatch.readData(PARENT_PATH, true);
            // 更新資料
            zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            /**
             * 讀取子節點,設定對子節點變化的watch,如果不寫該方法,則在建立子節點是隻會輸出NodeCreated,而不會輸出NodeChildrenChanged,
             * 也就是說建立子節點時沒有watch,
             * 如果是遞迴的建立子節點,如path="/p/c1/c2"的話,getChildren(PARENT_PATH, ture)只會在建立c1時watch,輸出c1的NodeChildrenChanged,
             * 而不會輸出建立c2時的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,則需要再呼叫一次getChildren(String path, true)方法,
             * 其中path="/p/c1"
             */
            System.out.println("------------------------讀取CHILDREN------------------------");
            zkWatch.getChildren(PARENT_PATH, true);

            Thread.sleep(1000);
            // 建立子節點
            zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

            Thread.sleep(1000);

            zkWatch.readData(CHILDREN_PATH, true);
            zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }

        Thread.sleep(20000);
        // 清理節點
        zkWatch.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatch.releaseConnection();
    }
}

二、ZkClient

1. 基本操作

/**
 * @Author SunnyBear
 * @Description ZkClient基本操作
 */
public class TestZkClientApi {

    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT);

        // 建立臨時節點,值為null
        zkClient.createEphemeral("/zkTemp");
        // 建立持久節點,,值為null,如果父節點不存在則建立
        zkClient.createPersistent("/zkPersistent/zk1", true);
        // 建立持久節點,有值
        zkClient.createPersistent("/zkPersistent/zk2", "zk2內容");
        zkClient.createPersistent("/zkPersistent/zk3", "zk3內容");

        // 查詢節點下面的所有節點
        List<String> childrenList = zkClient.getChildren("/zkPersistent");
        for (String p : childrenList) {
            String childrenPath = "/zkPersistent/" + p;
            // 查詢節點資料
            String data = zkClient.readData(childrenPath);
            System.out.println(childrenPath + ":" + data);
        }

        // 修改節點資料
        zkClient.writeData("/zkPersistent/zk1", "給zk1更新了內容");
        System.out.println(zkClient.readData("/zkPersistent/zk1"));

        // 刪除節點
        zkClient.delete("/zkTemp");
        // 遞迴刪除,即包含子目錄的刪除
        zkClient.deleteRecursive("/zkPersistent");

        // 關閉連線
        zkClient.close();
    }
}

2. 監聽機制

/**
 * @Author SunnyBear
 * @Description ZkClient監聽的測試
 */
public class ZkClientWatcher {

    private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";

    private static final int SESSION_TIMEOUT = 30000;

    public static void main(String[] args) throws InterruptedException {
//        test1();
        test2();
    }

    /**
     * 訂閱子節點的變化
     */
    public static void test1() throws InterruptedException {
        ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

        // 給父節點新增監聽子節點變化
        zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception {
                System.out.println("parentPath:" + parentPath);
                System.out.println("currentChildList" + currentChildList);
            }
        });

        Thread.sleep(3000);

        zkClient.createPersistent("/zkPersistent");
        Thread.sleep(1000);

        zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1內容");
        Thread.sleep(1000);

        zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2內容");
        Thread.sleep(1000);

        zkClient.delete("/super/c2");
        Thread.sleep(1000);

        zkClient.deleteRecursive("/super");
        Thread.sleep(10000);

        zkClient.close();
    }

    /**
     * 訂閱內容變化
     */
    public static void test2() throws InterruptedException {
        ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);

        zkClient.createPersistent("/zkPersistent", "zkPersistentData");

        // 對父節點新增監聽子節點變化。
        zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() {

            public void handleDataDeleted(String path) throws Exception {
                System.out.println("刪除的節點為:" + path);
            }

            public void handleDataChange(String path, Object data) throws Exception {
                System.out.println("變更的節點為:" + path + ", 變更內容為:" + data);
            }
        });

        Thread.sleep(3000);
        zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1);

        Thread.sleep(1000);
        zkClient.delete("/zkPersistent");

        Thread.sleep(10000);
    }
}

都讀到這裡了,來個 點贊、評論、關注、收藏 吧!

文章作者:IT王小二
首發地址:https://www.itwxe.com/posts/5a33d634/
版權宣告:文章內容遵循 署名-非商業性使用-禁止演繹 4.0 國際 進行許可,轉載請在文章頁面明顯位置給出作者與原文連結。