1. 程式人生 > WINDOWS開發 >五、Zookeeper基於API操作Node節點

五、Zookeeper基於API操作Node節點

安裝zookeeper :linux下安裝Zookeeper 3.4.14

zookeeper 分為5個包:

  1. org.apache.zookeeper //客戶端主要類檔案

  2. org.apache.zookeeper.data //

  3. org.apache.zookeeper.server

  4. org.apache.zookeeper.server.quorum

  5. org.apache.zookeeper.server.upgrade

建立會話

引入依賴:

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateSession implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);


    
/* 建立會話 */ public static void main(String[] args) throws IOException,InterruptedException { /* 客戶端可以通過建立一個zk例項來連線zk伺服器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 連線地址:IP:埠 sesssionTimeOut:會話超時時間:單位毫秒 Wather:監聽器(當特定事件觸發監聽時,zk會通過watcher通知到客戶端)
*/ ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,new CreateSession()); System.out.println(zooKeeper.getState()); // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞 countDownLatch.await(); System.out.println("客戶端與服務端會話真正建立了"); } /* 回撥方法:處理來自伺服器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程式在CountDownLatch上的等待阻塞 System.out.println("process方法執行了..."); countDownLatch.countDown(); } } }

注意:ZooKeeper客戶端和服務端會話的建立是一個非同步的過程,也就是說在程式中,構造方法會在處理完客戶端初始化工作後立即返回,在大多數情況下,此時並沒有真正建立好一個可用的會話,在會話的生命週期中處於“ CONNECTING的狀態。當該會話真正建立完畢後 ZooKeeper服務端會向會話對應的客戶端傳送一個事件通知,以告知客戶端,客戶端只有在獲取這個通知之後,才算真正建立了會話

建立節點

節點介紹:

/**
* path :節點建立的路徑
* data[] :節點建立要儲存的資料,是個byte型別的
* acl :節點建立的許可權資訊(4種類型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID僅可用於設定ACL。它將被客戶機驗證的ID替換。
* OPEN_ACL_UNSAFE :這是一個完全開放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予建立者身份驗證ID的所有許可權
* createMode :建立節點的型別(4種類型)
* PERSISTENT:持久節點
* PERSISTENT_SEQUENTIAL:持久順序節點
* EPHEMERAL:臨時節點
* EPHEMERAL_SEQUENTIAL:臨時順序節點
String node = zookeeper.create(path,data,acl,createMode);
*/

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateNote implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立會話
     */
    public static void main(String[] args) throws IOException,InterruptedException,KeeperException {

     /*
        客戶端可以通過建立一個zk例項來連線zk伺服器
        new Zookeeper(connectString,Wather)
        connectString: 連線地址:IP:埠
        sesssionTimeOut:會話超時時間:單位毫秒
        Wather:監聽器(當特定事件觸發監聽時,zk會通過watcher通知到客戶端)
     */

         zooKeeper = new ZooKeeper("127.0.0.1:2181",new CreateNote());
        System.out.println(zooKeeper.getState());

        // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回撥方法:處理來自伺服器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程式在CountDownLatch上的等待阻塞
            System.out.println("process方法執行了...");
            // 建立節點
            try {
                createNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }


    /*
       建立節點的方法
   */
    private static void createNoteSync() throws KeeperException,InterruptedException {

        /**
         *  path        :節點建立的路徑
         *  data[]      :節點建立要儲存的資料,是個byte型別的
         *  acl         :節點建立的許可權資訊(4種類型)
         *                 ANYONE_ID_UNSAFE    : 表示任何人
         *                 AUTH_IDS    :此ID僅可用於設定ACL。它將被客戶機驗證的ID替換。
         *                 OPEN_ACL_UNSAFE    :這是一個完全開放的ACL(常用)--> world:anyone
         *                 CREATOR_ALL_ACL  :此ACL授予建立者身份驗證ID的所有許可權
         *  createMode    :建立節點的型別(4種類型)
         *                  PERSISTENT:持久節點
         *                    PERSISTENT_SEQUENTIAL:持久順序節點
         *                  EPHEMERAL:臨時節點
         *                  EPHEMERAL_SEQUENTIAL:臨時順序節點
         String node = zookeeper.create(path,createMode);
         */

        // 持久節點
        String note_persistent = zooKeeper.create("/lg-persistent","持久節點內容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

        // 臨時節點
        String note_ephemeral = zooKeeper.create("/lg-ephemeral","臨時節點內容".getBytes(),CreateMode.EPHEMERAL);

        // 持久順序節點
        String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential","持久順序節點內容".getBytes(),CreateMode.PERSISTENT_SEQUENTIAL);

        System.out.println("建立的持久節點" + note_persistent);
        System.out.println("建立的臨時節點" + note_ephemeral);
        System.out.println("建立的持久順序節點" + note_persistent_sequential);

    }
}

讀取節點:

public class GetNoteData implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立會話
     */
    public static void main(String[] args) throws IOException,new GetNoteData());
        System.out.println(zooKeeper.getState());

        // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回撥方法:處理來自伺服器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {

        /*
            子節點列表發生改變時,伺服器端會發生noteChildrenChanged事件通知
            要重新獲取子節點列表,同時注意:通知是一次性的,需要反覆註冊監聽
         */
        if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){

            List<String> children = null;
            try {
                children = zooKeeper.getChildren("/lg-persistent",true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(children);

        }



        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程式在CountDownLatch上的等待阻塞
            System.out.println("process方法執行了...");

            // 獲取節點資料的方法
            try {
                getNoteData();

                // 獲取節點的子節點列表方法
                getChildrens();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }


    }

    /*
        獲取某個節點的內容
     */
    private void getNoteData() throws KeeperException,InterruptedException {

        /**
         * path    : 獲取資料的路徑
         * watch    : 是否開啟監聽
         * stat    : 節點狀態資訊
         *        null: 表示獲取最新版本的資料
         *  zk.getData(path,watch,stat);
         */
        byte[] data = zooKeeper.getData("/lg-persistent",false,null);
        System.out.println(new String(data));


    }


    /*
        獲取某個節點的子節點列表方法
     */
    public static void getChildrens() throws KeeperException,InterruptedException {

        /*
            path:路徑
            watch:是否要啟動監聽,當子節點列表發生變化,會觸發監聽
            zooKeeper.getChildren(path,watch);
         */
        List<String> children = zooKeeper.getChildren("/lg-persistent",true);
        System.out.println(children);



    }



}

刪除節點

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class DeleteNote implements Watcher {

    private  static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;


    /*
      建立會話
     */
    public static void main(String[] args) throws IOException,new DeleteNote());
        System.out.println(zooKeeper.getState());

        // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞
        //countDownLatch.await();\
        Thread.sleep(Integer.MAX_VALUE);

    }



    /*
        回撥方法:處理來自伺服器端的watcher通知
     */
    public void process(WatchedEvent watchedEvent) {
        // SyncConnected
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

            //解除主程式在CountDownLatch上的等待阻塞
            System.out.println("process方法執行了...");
            // 刪除節點
            try {
                deleteNoteSync();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }


    }

    /*
        刪除節點的方法
     */
    private void deleteNoteSync() throws KeeperException,InterruptedException {

        /*
          zooKeeper.exists(path,watch) :判斷節點是否存在
          zookeeper.delete(path,version) : 刪除節點
      */

        Stat stat = zooKeeper.exists("/lg-persistent/c1",false);
        System.out.println(stat == null ? "該節點不存在":"該節點存在");

        if(stat != null){
            zooKeeper.delete("/lg-persistent/c1",-1);
        }

        Stat stat2 = zooKeeper.exists("/lg-persistent/c1",false);
        System.out.println(stat2 == null ? "該節點不存在":"該節點存在");



    }


}

更新節點

 /*
        更新資料節點內容的方法
     */
    private void updateNoteSync() throws KeeperException,InterruptedException {

         /*
            path:路徑
            data:要修改的內容 byte[]
            version:為-1,表示對最新版本的資料進行修改
            zooKeeper.setData(path,version);
         */


        byte[] data = zooKeeper.getData("/lg-persistent",null);
        System.out.println("修改前的值:" + new String(data));

        //修改/lg-persistent 的資料 stat: 狀態資訊物件
        Stat stat = zooKeeper.setData("/lg-persistent","客戶端修改了節點資料".getBytes(),-1);

        byte[] data2 = zooKeeper.getData("/lg-persistent",null);
        System.out.println("修改後的值:" + new String(data2));

    }