1. 程式人生 > 程式設計 >在Java中操作Zookeeper的示例程式碼詳解

在Java中操作Zookeeper的示例程式碼詳解

依賴

 <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.0</version>
  </dependency>

連線到zkServer

//連線字串,zkServer的ip、port,如果是叢集逗號分隔
  String connectStr = "192.168.1.9:2181";

  //zookeeper就是一個zkCli
  ZooKeeper zooKeeper = null;

  try {
     //初始次數為1。後面要在內部類中使用,三種寫法:1、寫成外部類成員變數,不用加final;2、作為函式區域性變數,放在try外面,寫成final;3、寫在try中,不加final
     CountDownLatch countDownLatch = new CountDownLatch(1);
    //超時時間ms,監聽器
    zooKeeper = new ZooKeeper(connectStr,5000,new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //如果狀態變成已連線
        if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
          System.out.println("連線成功");
          //次數-1
          countDownLatch.countDown();
        }
      }
    });
    //等待,次數為0時才會繼續往下執行。等待監聽器監聽到連線成功,才能操作zk
    countDownLatch.await();
  } catch (IOException | InterruptedException e) {
    e.printStackTrace();
  }


  //...操作zk。後面的demo都是寫在此處的


  //關閉連線
  try {
    zooKeeper.close();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

檢測節點是否存在

// 檢測節點是否存在

  // 同步方式
  Stat exists = null;
  try {
    //如果存在,返回節點狀態Stat;如果不存在,返回null。第二個引數是watch
    exists = zooKeeper.exists("/mall",false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  if (exists==null){
    System.out.println("節點不存在");
  }
  else {
    System.out.println("節點存在");
  }


  //非同步回撥
  zooKeeper.exists("/mall",false,new AsyncCallback.StatCallback() {
    //第二個是path znode路徑,第三個是ctx 後面傳入實參,第四個是znode的狀態
    public void processResult(int i,String s,Object o,Stat stat) {
      //如果節點不存在,返回的stat是null
      if (stat==null){
        System.out.println("節點不存在");
      }
      else{
        System.out.println("節點存在");
      }
    }
  // 傳入ctx,Object型別
  },null);

操作後,服務端會返回處理結果,返回void、null也算處理結果。

同步指的是當前執行緒阻塞,等待服務端返回資料,收到返回的資料才繼續往下執行;

非同步回撥指的是,把對結果(返回的資料)的處理寫在回撥函式中,當前執行緒不等待返回的資料,繼續往下執行,收到返回的資料時自動呼叫回撥函式來處理。

如果處理返回資料的程式碼之後的操作,不依賴返回資料、對返回資料的處理,那麼可以把返回資料的處理寫成回撥函式。

建立節點

//建立節點

  //同步方式
  try {
    //資料要寫成byte[],不攜帶資料寫成null;預設acl許可權使用ZooDefs.Ids.OPEN_ACL_UNSAFE;最後一個是節點型別,P是永久,E是臨時,S是有序
    zooKeeper.create("/mall","abcd".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
    System.out.println("已建立節點/mall");
    //如果節點已存在,會丟擲異常
  } catch (KeeperException | InterruptedException e) {     System.out.println("建立節點/mall失敗,請檢查節點是否已存在");
    e.printStackTrace();
  }

  //非同步回撥
  zooKeeper.create("/mall",CreateMode.PERSISTENT,new AsyncCallback.Create2Callback(){
    //第二個path,第三個ctx,第四個節點狀態
    public void processResult(int i,String s1,Stat stat) {
      //回撥方式不丟擲異常,返回的stat是建立節點的狀態,如果節點已存在,返回的stat是null
      if (stat==null){
        System.out.println("建立節點/mall失敗,請檢查節點是否已存在");
      }
      else {
        System.out.println("節點建立成功");
      }
    }
    //ctx實參
  },null);

刪除節點

//刪除節點

  //同步方式
  try {
    //第二個引數是版本號,-1表示可以是任何版本
    zooKeeper.delete("/mall1",-1);
    System.out.println("成功刪除節點/mall");
  } catch (InterruptedException | KeeperException e) {
    System.out.println("刪除節點/mall失敗");
    e.printStackTrace();
  }


  //非同步回撥
  zooKeeper.delete("/mall2",-1,new AsyncCallback.VoidCallback() {
    //第二個是path,第三個是ctx
    public void processResult(int i,Object o) {
      
    }
  //
  //ctx實參
  },null);

delete()只能刪除沒有子節點的znode,如果該znode有子節點會丟擲異常。

沒有提供遞迴刪除子節點的方法,如果要刪除帶有子節點的znode,需要自己實現遞迴刪除。可以先getChildren()獲取子節點列表,遍歷列表依次刪除子節點,再刪除父節點。

獲取子節點列表

//獲取子節點列表,List<String>,比如/mall/user,/mall/order,返回的是["user"、"order"]

  //同步方式
  List<String> children = null;
  try {
    //第二個引數是watch
    children = zooKeeper.getChildren("/mall",false);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  System.out.println("子節點列表:" + children);


  //非同步
  zooKeeper.getChildren("/mall",new AsyncCallback.ChildrenCallback() {
    //第二個起依次是:path、ctx、返回的子節點列表
    public void processResult(int i,List<String> list) {
      System.out.println("子節點列表:" + list);
    }
  //ctx實參
  },null);

只獲取子節點,不獲取孫節點。

watch都是:可以寫boolean,要新增監聽就寫true,不監聽寫false;也可以寫Watcher物件,new一個Watcher物件表示要監聽,null表示不監聽。

獲取節點資料

//獲取節點資料,返回byte[]

  //同步方式
  byte[] data = null;
  try {
    //第二個引數是watch,第三個是stat
    data = zooKeeper.getData("/mall",null);
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }
  //呼叫new String()時要判斷data是否為null,如果是null會拋NPE
  if (data==null){
    System.out.println("該節點沒有資料");
  }
  else{
    System.out.println("節點資料:"+new String(data));
  }


  //非同步回撥
  zooKeeper.getData("/mall",new AsyncCallback.DataCallback() {
    //第二個起依次是:path、ctx、返回的節點資料、節點狀態
    public void processResult(int i,byte[] bytes,Stat stat) {
      //不必判斷bytes是否是null,如果節點沒有資料,不會呼叫回撥函式;執行到此,說明bytes不是null
      System.out.println("節點資料:" + new String(bytes) );
    }
    //ctx實參
  },null);

設定|修改節點資料

//設定|更新節點據

  //同步方式
  try {
    //最後一個引數是版本號
    zooKeeper.setData("/mall","1234".getBytes(),-1);
    System.out.println("設定節點資料成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("設定節點資料失敗");
    e.printStackTrace();
  }


  //非同步回撥
  zooKeeper.setData("/mall",new AsyncCallback.StatCallback() {
    //第二個是path,第三個是ctx
    public void processResult(int i,Stat stat) {

    }
  // ctx
  },null);

設定acl許可權

//設定acl許可權
    
  //第一個引數指定許可權,第二個是Id物件
  ACL acl = new ACL(ZooDefs.Perms.ALL,new Id("auth","chy:abcd"));
  
  List<ACL> aclList = new ArrayList<>();
  aclList.add(acl);
  
  //如果List中只有一個ACL物件,也可以這樣寫
  //List<ACL> aclList = Collections.singletonList(auth);
    
  //驗證許可權,需寫在設定許可權之前。如果之前沒有設定許可權,也需要先驗證本次即將設定的使用者
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  
  //方式一 setAcl
  try {
    //第二個引數是List<ACL>,第三個引數是版本號
    zooKeeper.setACL("/mall",aclList,-1);
    System.out.println("設定許可權成功");
  } catch (KeeperException | InterruptedException e) {
    System.out.println("設定許可權失敗");
    e.printStackTrace();
  }

  
  //方式二 在建立節點時設定許可權
  try {
    zooKeeper.create("/mall",CreateMode.PERSISTENT);
    System.out.println("已建立節點並設定許可權");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

設定許可權之後,連線zkServer進行操作時,都需要先驗證使用者。

此處未寫對應的非同步回撥。

檢視acl許可權

//檢視acl許可權
    
  //設定許可權之後,以後操作時需要先驗證使用者,一次session中驗證一次即可
  zooKeeper.addAuthInfo("digest","chy:abcd".getBytes());

  
  //同步方式
  try {
    List<ACL> aclList = zooKeeper.getACL("/mall",null);
    System.out.println("acl許可權:"+aclList);
  } catch (KeeperException | InterruptedException e) {
    System.out.println("獲取acl許可權失敗");
    e.printStackTrace();
  }


  //非同步回撥
  zooKeeper.getACL("/mall3",null,new AsyncCallback.ACLCallback() {
    //第二個起:path、ctx、獲取到的List<ACL>、節點狀態
    public void processResult(int i,List<ACL> list,Stat stat) {
      //就算沒有手動設定acl許可權,預設也是有值的
      System.out.println("acl許可權:"+list);
    }
  //ctx實參
  },null);

新增監聽器

//新增監聽 方式一
  try {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    zooKeeper.getData("/mall",new Watcher() {
      public void process(WatchedEvent watchedEvent) {
        //watcher會監聽該節點所有的事件,不管發生什麼事件都會呼叫process()來處理,需要先判斷一下事件型別
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("節點資料改變了");
          //會一直監聽,如果只監聽一次資料改變,將下面這句程式碼取消註釋即可
          //countDownLatch.countDown();
        }
      }
    },null);
    //預設watcher是一次性的,如果要一直監聽,需要藉助CountDownLatch
    countDownLatch.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

ZooKeeper類的exists()、getData()、getChildren()方法都具有新增監聽的功能,用法類似。

watchedEvent.getType().equals(Event.EventType.NodeDataChanged)
watchedEvent.getState().equals(Event.KeeperState.SyncConnected)

getType是獲取事件型別,getState是獲取連線狀態。

上面這種方式,會遞迴監聽子孫節點,子孫節點的資料改變也算NodeDataChanged,子孫節點的建立|刪除也算NodeCreated|NodeDeleted。

//新增監聽 方式二  
   try {
    CountDownLatch countDownLatch1 = new CountDownLatch(1);
    zooKeeper.addWatch("/mall",new Watcher() {
      @Override
      public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType().equals(Event.EventType.NodeDataChanged)){
          System.out.println("節點資料改變了");
          //如果只監聽一次資料改變,將下面這句程式碼註釋掉
          //countDownLatch1.countDown();
        }
      }
    //監聽模式,PERSISTENT是不監聽子孫節點,PERSISTENT_RECURSIVE是遞迴監聽子孫節點
    },AddWatchMode.PERSISTENT_RECURSIVE);
    countDownLatch1.await();
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

countDownLatch1.await();要阻塞執行緒,最好啟動一條新執行緒來監聽。

只有設定了監聽的zkCli,該節點發生事件時才會收到zkServer的通知。

watch只儲存在zkServer的記憶體中(zk依賴jdk,執行在jvm上,堆中的session物件),不持久化到硬碟,就是說設定的監聽只在本次會話期間有效,zkCli關閉連線,zkServer在指定時間後(預設連續沒有收到10個心跳),zkServer會自動刪除相關session,watcher丟失。

移除監聽

//移除監聽 方式一
  try {
    zooKeeper.addWatch("/mall",AddWatchMode.PERSISTENT);
    System.out.println("已移除監聽");
  } catch (KeeperException | InterruptedException e) {
    e.printStackTrace();
  }

就是上面新增監聽的哪些方法,watch|watcher引數,如果是boolean型別,設定為false即可關閉監聽;如果是Watcher型別,可以設定null覆蓋掉之前設定的監聽。

//移除監聽 方式二
  try {
    //第二個引數是Watcher,原來新增的那個Watcher監聽物件,不能是null
    //第三個引數指定要移除監聽的哪部分,Any是移除整個監聽,Data是移除對資料的監聽,Children是移除對子節點的遞迴監聽
    //最後一個引數指定未連線到zkServe時,是否移除本地監聽部分
    zooKeeper.removeWatches("/mall",watcher,Watcher.WatcherType.Children,true);
  } catch (InterruptedException | KeeperException e) {
    e.printStackTrace();
  }

監聽由2部分組成,一部分在zkServer上,事件發生時通知對應的zkCli;一部分在zkCli,收到zkServer的通知時做出一些處理。

最後一個引數指定未連線到zkServer,是否移除本地(zkCli)監聽部分,true——移除,false——不移除。

比如說沒有連線到zkServer,移除本地監聽,10個心跳內連上了zkServer,zkServer的監聽部分仍在,發生事件時仍會通知此zkCli,但zkCli本地監聽已經移除了,對通知不會做出處理。

第一種方式會移除整個監聽,不需要傳入監聽物件watcher;

第二種方式功能更全,可以指定移除監聽的哪個部分,但需要傳入watcher物件,新增監聽時要用一個變數來儲存watcher物件。

到此這篇關於在Java中操作Zookeeper的示例程式碼詳解的文章就介紹到這了,更多相關Java操作Zookeeper內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!