1. 程式人生 > >Curator 操作 zookeeper 全面講解

Curator 操作 zookeeper 全面講解

zookeeper 的安裝與叢集的搭建 請參考我的另一片文章 https://blog.csdn.net/weixin_40461281/article/details/85336396

首先 建立一個maven專案 (不細講了,不會的自行百度)

匯入curator jar包

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.11.0</version>
        </dependency>

new 一個類 名為 CuratorCreateSessionDemo

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Curator 建立session連線 demo
 */
public class CuratorCreateSessionDemo {

    // 叢集連線地址
    private final static  String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181";

    public static void main(String[] args){

        // 建立會話的兩種方式 normal
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
                CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3));
        curatorFramework.start();

        // fluent 風格
        CuratorFramework build = CuratorFrameworkFactory.builder()
                .connectString(CONNECTSTRING)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                // 異常重試策略
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                // 名稱空間(以後建立節點都在這個節點之下)
                .namespace("/curator")
                .build();
        // 開啟連線
        build.start();

        System.out.println("success");
    }
}

為了便於之後的使用 我將連線編寫為 工具類 CuratorClientUtils

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Curator客戶端工具類
 */
public class CuratorClientUtils {

    // 叢集連線地址
    private final static  String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181";

    private static CuratorFramework curatorFramework;

    public static CuratorFramework getInstance(){
        curatorFramework = CuratorFrameworkFactory.newClient(
                CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3));
        curatorFramework.start();
        return curatorFramework;
    }
}

API 操作

建立節點 curator 可以直接建立父節點與子節點 (底層為遞迴建立)  返回值為建立節點的名稱

        // 建立節點
        String s = curatorFramework.create()
                // 同時建立父節點與子節點
                .creatingParentContainersIfNeeded()
                // PERSISTENT 永久節點  EPHEMERAL 臨時節點
                .withMode(CreateMode.PERSISTENT)
                .forPath("/curator/curator1/curator11", "123".getBytes());
        System.out.println(s);

刪除節點 將上一步建立的 /curator 節點刪除 

curator 可以直接指定 父節點進行刪除 會將該父節點下的子節點全部刪除 (底層為遞迴操作)

        // 刪除節點
        curatorFramework.delete()
                // 同時刪除父節點與子節點
                .deletingChildrenIfNeeded()
                .forPath("/curator");
        System.out.println("刪除成功...........");

查詢/獲取節點資料

        // 查詢/獲取資料
        Stat stat = new Stat();
        byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
        System.out.println(new String(bytes1) + " ---> stat: " + stat);

修改節點資料

        // 修改資料
        Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
        System.out.println(stat1);

        // 查詢/獲取資料
        Stat stat2 = new Stat();
        byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
        System.out.println(new String(bytes2) + " ---> stat: " + stat2);

非同步操作

        // 非同步操作
        // 建立一個執行緒池
        ExecutorService service = Executors.newFixedThreadPool(1);
        // 建立同步計數器 數值為1
        CountDownLatch countDownLatch = new CountDownLatch(1);
        curatorFramework.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                // 非同步執行
                .inBackground((curatorFramework1, curatorEvent) -> {
                    // 輸出處理執行緒名稱 結果號 執行型別
                    System.out.println(Thread.currentThread().getName() + " -> resultCode" +
                            curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
                    // 計數器遞減
                    countDownLatch.countDown();
                }, service).forPath("/demo","789".getBytes());
                // 最後一個引數為 一個執行緒池(可不填) 表示將任務交給執行緒池來處理
        // 阻塞執行緒 等待計數器歸零
        countDownLatch.await();
        // 關閉執行緒池
        service.shutdown();

事務操作

操作一個錯誤節點

        // 事務操作
        Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
                .create().forPath("/trans", "111".getBytes())
                // 操作一個不存在的節點 會報錯並回退
                .and().setData().forPath("/xxxx", "111".getBytes())
                .and().commit();
        for (CuratorTransactionResult result:commit){
            System.out.println(result.getForPath()+" -> "+ result.getType());
        }

報錯節點不存在

操作一個正確節點

        // 事務操作
        Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
                .create().forPath("/trans", "111".getBytes())
                // 操作一個正確節點
                .and().setData().forPath("/curator", "111".getBytes())
                .and().commit();
        for (CuratorTransactionResult result:commit){
            System.out.println(result.getForPath()+" -> "+ result.getType());
        }

整體程式碼

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Curator API 操作 demo
 */
public class CuratorOperatorDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
        System.out.println("連線成功...........");

        // 建立節點
        String s = curatorFramework.create()
                // 同時建立父節點與子節點
                .creatingParentContainersIfNeeded()
                // PERSISTENT 永久節點  EPHEMERAL 臨時節點
                .withMode(CreateMode.PERSISTENT)
                .forPath("/curator/curator1/curator11", "123".getBytes());
        System.out.println("建立節點: "+s + "內容為 123");

        // 刪除節點
        curatorFramework.delete()
                // 同時刪除父節點與子節點
                .deletingChildrenIfNeeded()
                .forPath("/curator");
        System.out.println("刪除成功...........");

        // 查詢/獲取資料
        Stat stat = new Stat();
        byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
        System.out.println(new String(bytes1) + " ---> stat: " + stat);

        // 修改資料
        Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
        System.out.println("修改成功: 內容已修改為 456 stat: "+stat1);

        // 查詢/獲取資料
        Stat stat2 = new Stat();
        byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
        System.out.println(new String(bytes2) + " ---> stat: " + stat2);

        // 非同步操作
        // 建立一個執行緒池
        ExecutorService service = Executors.newFixedThreadPool(1);
        // 建立同步計數器 數值為1
        CountDownLatch countDownLatch = new CountDownLatch(1);
        curatorFramework.create()
                .creatingParentContainersIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                // 非同步執行
                .inBackground((curatorFramework1, curatorEvent) -> {
                    // 輸出處理執行緒名稱 結果號 執行型別
                    System.out.println(Thread.currentThread().getName() + " -> resultCode" +
                            curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
                    // 計數器遞減
                    countDownLatch.countDown();
                }, service).forPath("/demo","789".getBytes());
                // 最後一個引數為 一個執行緒池(可不填) 表示將任務交給執行緒池來處理
        // 阻塞執行緒 等待計數器歸零
        countDownLatch.await();
        // 關閉執行緒池
        service.shutdown();

        // 事務操作
        Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
                .create().forPath("/trans", "111".getBytes())
                // 操作一個正確節點
                .and().setData().forPath("/curator", "111".getBytes())
                // 操作一個不存在的節點 會報錯並回退
//                .and().setData().forPath("/xxxx", "111".getBytes())
                .and().commit();
        for (CuratorTransactionResult result:commit){
            System.out.println(result.getForPath()+" -> "+ result.getType());
        }
    }
}

 

事件監聽

直接送上程式碼 註釋很詳細

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.TimeUnit;

/**
 * Curator 事件監聽 dmeo
 */
public class CuratorEventDemo {

    /**
     * 三種watcher來做節點的監聽
     * pathcache 監聽一個路徑下子節點的 建立 刪除 更新
     * NodeCache 監聽一個節點的 建立 更新 刪除
     * TreeCache pathcache 加 nodecache 的合體 (監聽路徑下的 建立 更新 刪除事件)
     * 快取路徑下的所有子節點的資料
     */

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorClientUtils.getInstance();

        // 節點變化 NodeCache
//        NodeCache cache = new NodeCache(curatorFramework,"/curator",false);
//
//        // true 初始化設定
//        cache.start(true);
//
//        cache.getListenable().addListener(() ->
//                System.out.println("節點資料發生變化,變化後的結果: " + new String(cache.getCurrentData().getData())));
//
//        curatorFramework.setData().forPath("/curator","菲菲".getBytes());

        // PathChildrenCache
        PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/event", true);

        /**
         * Normal 初始化為空
         * BUILD_INITIAL_CACHE 方法 return 之前 呼叫一個 rebuild 操作
         * POST_INITIALIZED_EVENT cache 初始化後發出一個事件
         */
        cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
            // 事件還有很多,列舉三個
            switch (pathChildrenCacheEvent.getType()) {
                case CHILD_ADDED:
                    System.out.println("增加子節點");
                    break;
                case CHILD_REMOVED:
                    System.out.println("刪除子節點");
                    break;
                case CHILD_UPDATED:
                    System.out.println("更新子節點");
                    break;
                default:
                    break;
            }
        });
        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
        TimeUnit.SECONDS.sleep(1);

        curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
        TimeUnit.SECONDS.sleep(1);

        curatorFramework.setData().forPath("/event/event1", "222".getBytes());
        TimeUnit.SECONDS.sleep(1);

        curatorFramework.delete().forPath("/event/event1");
//        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/event");
        System.in.read();
    }
}