1. 程式人生 > >2018-07-18:03-java操作zookeeper

2018-07-18:03-java操作zookeeper

  • 客戶端操作zookeeper學習完後,我們要使用java控制zookeeper。我們先用apache提供的zookeeper功能來操作zookeeper伺服器。
  • 我們建立的每個zookeeper都是一個客戶端連結

先看程式碼,然後解釋

1.建立一個maven專案,匯入Apache的zookeeper依賴

<dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.37</version>
        </dependency>
    </dependencies>

2.編寫客戶端連線

package com.zookeeper.test;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class ZookeeperConnection {
    private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    //連線過程:not connection connecting connected close
    public static void main(String[] args) throws Exception {
//        testConnecting();
        testConnected();
    }
    public static void testConnecting() throws Exception{
        ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
        System.out.println(zk.getState());
    }

    public static void testConnected() throws Exception{
        ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                    countDownLatch.countDown();
                    System.out.println(watchedEvent.getState());
                }

            }
        });
        countDownLatch.await();
        System.out.println(zk.getState());
    }
}
zookeeper連線過程

連線過程.png

  • testConnected方法輸出的是connected
    在使用countDounLatch的時候我們等待連線完成才輸出
  • testConnecting方法輸出的是connecting
    沒有使用執行緒等待,直接輸出。出現這種情況是因為連線是非同步的,主執行緒執行完畢,連線還在進行中。
    ###3.節點的增刪改查
public static void main(String[] args) throws Exception {
//        testConnecting();
        ZooKeeper zk = testConnected();
        //建立節點:節點路徑,節點攜帶的值,節點的許可權,建立持久化節點
        zk.create("/node","abc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //獲得節點內容:
        Stat stat = new Stat();
        byte[] data = zk.getData("/node",false,stat);
        System.out.println(new String(data));
        //修改節點
        zk.setData("/node","cba".getBytes(),-1);
        Stat stat1 = new Stat();
        byte[] data1 = zk.getData("/node",false,stat1);
        System.out.println(new String(data1));
        //刪除節點
        zk.delete("/node",-1);
    }

4.zookeeper的watch功能(重點)

watch類似訂閱釋出模式。監聽某個節點,當節點發生變化監聽節點就會收到通知。直接上程式碼

  • watch監聽類
public class MyWatch implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
                if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
                    countDownLatch.countDown();
                    System.out.println("連線中..."+watchedEvent.getState());
                }
                //節點資料發生變化
                else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
                    String path = watchedEvent.getPath();
                    //能夠註冊事件的方法getData
                    System.out.println("節點資料變化事件--路徑:"+watchedEvent.getPath());
                }
                //建立節點
                else if(watchedEvent.getType() == Event.EventType.NodeCreated){
                    System.out.println("建立節點:路徑:"+watchedEvent.getPath());
                }
                //建立子節點
                else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
                    String path = watchedEvent.getPath();
                    //能夠註冊事件的方法getData
                    System.out.println("子節點資料變化事件--路徑:"+watchedEvent.getPath());
                }
                //節點刪除
                else if(watchedEvent.getType() == Event.EventType.NodeDeleted){
                    System.out.println("刪除節點");
                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
    }
}
  • 實現一個watch只需要實現Watcher類就可以了,然後實現裡面的process方法
  • WatchedEvent是事件物件,可以根據不同事件型別進行處理
  • 測試watch類
public class ZkClient {
    private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    public static void main(String[] args) throws Exception {
        MyWatch myWatch = new MyWatch();
        ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
        Stat stat = zk1.exists("/watchNode",true);
        if(stat == null){
            zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }

        stat = zk1.exists("/watchNode",true);
        if(stat == null){
            zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }else{
            System.out.println("客戶端2註冊了watchNode節點的getDate watch");
            zk1.getData("/watchNode",true,stat);
            System.out.println("客戶端1修改watchNode節點資料");
            zk1.setData("/watchNode","1234".getBytes(),-1);
        }
    }
}
  • 執行結果:
    image.png

上面我們寫了最簡單的事件監聽。單個客戶端監聽/watchNode這個節點。zookeeper的監聽是單次通知,第二次操作/watchNode時需要二次監聽。zookeeper提供監聽的方法有三個:exists;getData;getChildren。

  • 多客戶端監聽同一節點
public class ZkClient2 {
    private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
    public static void main(String[] args) throws Exception {
        MyWatch myWatch = new MyWatch();
        ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
        ZooKeeper zk2 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
        //zk1監聽了/watchNode節點事件
        Stat stat1 = zk1.exists("/watchNode",true);
        //zk2監聽了/watchNode節點事件
        Stat stat2 = zk2.exists("/watchNode",true);
        if(stat1 == null){
            zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        //當zk1建立節點後,zk2也收到了通知
    }
}
  • 執行結果:
    image.png