1. 程式人生 > 其它 >zookeeper——客戶端Curator 的簡單操作

zookeeper——客戶端Curator 的簡單操作

技術標籤:zookeeper分散式zookeeper

目錄

curator簡介

1.curator連線zookeeper

2.建立節點

2.1簡單建立

2.2 自定義許可權

2.3遞迴建立

2.4非同步建立

3.修改節點

3.1 同步修改

3.2 非同步修改

4.刪除節點

4.1同步刪除

4.2非同步刪除

5.檢視節點資料

5.1同步方式

5.2非同步方式

6.檢視子節點

6.1同步方式

6.2 非同步方式

7.是否存在

7.1同步方式

7.2非同步方式

8.監視器

8.1監視當前節點

8.2 監聽子節點

9.事務

10.分散式鎖


curator簡介

curator是Netfilx公司開源的一個zookeeper客戶端,後捐獻給apache,curator框架在zookeeper原生API介面上進行了包裝,解決了很多zookeeper客戶端非常底層的細節開發。提供zookeeper各種應用場景(比如:分散式鎖服務、叢集領導選舉、共享計數器、快取機制、分散式佇列等)的抽象封裝,實現了Fluent風格的API介面,是最好用,最流行的zookeeper的客戶端。

匯入依賴:

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>

1.curator連線zookeeper

package com.cjian.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;


public class CuratorConnection {
    public static void main(String[] args) {
        /**
         * 重連策略
         * new RetryOneTime(3000)  三秒後重連一次,只重連一次
         * new RetryNTimes(3, 3000); 每三秒重連一次,重連三次
         * new RetryUntilElapsed(10000, 3000); 每三秒重連一次,總等待時間超過10秒後停止重連
         * new ExponentialBackoffRetry(1000, 3);隨著重連次數的增加,重連的時間增加  baseSleepTimeMs*Math.max(1,random.nextInt(1<<retryCount+1))
         *
         */
        CuratorFramework client = CuratorFrameworkFactory.builder()
            //IP埠號
            .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
            //會話超時時間
            .sessionTimeoutMs(5000)
            //重連機制  策略
            .retryPolicy(new RetryOneTime(3000))
            //名稱空間  父節點
            .namespace("create")
            //構建連線物件
            .build();
        //開啟連線
        client.start();
        System.out.println(client.isStarted());
        //關閉連線
        client.close();
    }

}

2.建立節點

2.1簡單建立

 @Test
    public void create1() throws Exception {
        client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node1", "node1".getBytes());
        System.out.println("建立結束");
    }
[zk: 127.0.0.1:2182(CONNECTED) 3] ls /
[zookeeper]
[zk: 127.0.0.1:2182(CONNECTED) 4] ls /
[create, zookeeper]
[zk: 127.0.0.1:2182(CONNECTED) 5] get /create/node1
node1
cZxid = 0x300000025
ctime = Tue Feb 02 09:21:13 CST 2021
mZxid = 0x300000025
mtime = Tue Feb 02 09:21:13 CST 2021
pZxid = 0x300000025
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

2.2 自定義許可權

@Test
    public void create2() throws Exception {
        //自定義許可權列表
        ArrayList<ACL> acls = new ArrayList<>();
        Id id = new Id("ip", "127.0.0.1");
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        client.create()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acls)
            .forPath("/node2", "node2".getBytes());
        System.out.println("建立結束");
    }
[zk: 127.0.0.1:2182(CONNECTED) 7] getAcl /create/node2
'ip,'127.0.0.1
: cdrwa

2.3遞迴建立

 @Test
    public void create3() throws Exception {
        //遞迴建立節點樹
        client.create()
            .creatingParentsIfNeeded()//支援遞迴建立節點
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath("/node3/node31", "node31".getBytes());
    }

2.4非同步建立

@Test
    public void create4() throws Exception {
        //非同步方式建立節點
        client.create()
            .creatingParentsIfNeeded()//支援遞迴建立節點
            .withMode(CreateMode.PERSISTENT)
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());
                }
            })
            .forPath("/node4", "node4".getBytes());
        Thread.sleep(5000);
        System.out.println("建立結束");
    }
/node4
CREATE
建立結束

3.修改節點

3.1 同步修改

@Test
    public void set1() throws Exception {
        client.setData()
            .forPath("/node1", "node11".getBytes());
    }
    @Test
    public void set2() throws Exception {
        client.setData()
            //指定版本號
            .withVersion(-1)
            .forPath("/node1", "node22".getBytes());
    }

3.2 非同步修改

 @Test
    public void set3() throws Exception {
        //非同步方式
        client.setData()
            //指定版本號
            .withVersion(-1)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());
                }
            })
            .forPath("/node1", "node33".getBytes());
        Thread.sleep(5000);
        System.out.println("修改結束");
    }
/node1
SET_DATA
修改結束

4.刪除節點

4.1同步刪除

  @Test
    public void delete1() throws Exception {
        client.delete().forPath("/node1");
    }
    @Test
    public void delete2() throws Exception {
        client.delete()
            //版本號
            .withVersion(-1)
            .forPath("/node1");
    }

    @Test
    public void delete3() throws Exception {
        client.delete()
            //刪除包含子節點的節點
            .deletingChildrenIfNeeded()
            .forPath("/node1");
    }

4.2非同步刪除

 @Test
    public void delete4() throws Exception {
        client.delete()
            //刪除包含子節點的節點
            .deletingChildrenIfNeeded()
            .withVersion(-1)
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());
                }
            })
            .forPath("/node1");
        Thread.sleep(5000);
        System.out.println("刪除結束");
    }
/node1
DELETE
刪除結束

5.檢視節點資料

5.1同步方式

  @Test
    public void getData1() throws Exception {
        //讀取節點資料
        byte[] node2s = client.getData().forPath("/node2");
        System.out.println(new String(node2s));
    }

    @Test
    public void getData2() throws Exception {
        Stat stat = new Stat();
        //讀取資料時讀取節點的屬性
        byte[] node2s = client.getData()
            .storingStatIn(stat).forPath("/node2");
        System.out.println(new String(node2s));
        System.out.println(stat.getVersion());

    }

5.2非同步方式

@Test
    public void getData3() throws Exception {
        Stat stat = new Stat();
        //讀取資料時讀取節點的屬性
        byte[] node2s = client.getData()
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());
                    //資料
                    System.out.println(new String(curatorEvent.getData()));
                }
            }).forPath("/node2");
        Thread.sleep(5000);
    }
/node2
GET_DATA
node2

6.檢視子節點

6.1同步方式

 @Test
    public void getChild1() throws Exception {
        List<String> list = client.getChildren().forPath("/node1");
        for (String s : list) {
            System.out.println(s);
        }
    }
node11

6.2 非同步方式

@Test
    public void getChild2() throws Exception {
        client.getChildren()
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());
                    List<String> list = curatorEvent.getChildren();
                    for (String s : list) {
                        System.out.println(s);
                    }
                }
            }).forPath("/node1");
        Thread.sleep(5000);
    }
/node1
CHILDREN
node11

7.是否存在

7.1同步方式

@Test
    public void exists1() throws Exception {
        Stat stat = client.checkExists().forPath("/node1");
        System.out.println(stat.getVersion());
    }

如果不存在,則stat為null

7.2非同步方式

@Test
    public void exists2() throws Exception {
        client.checkExists()
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
                    throws Exception {
                    //節點路徑
                    System.out.println(curatorEvent.getPath());
                    //事件型別
                    System.out.println(curatorEvent.getType());

                    System.out.println(curatorEvent.getStat().getVersion());
                }
            }).forPath("/node1");
    }
/node1
EXISTS
0

8.監視器

curator提供了兩種watcher來監聽節點的變化

  • NodeCache :之間聽某一個特定的節點,監聽節點的新增和修改
  • PathChildrenCache:監聽一個znode的子節點,當一個子節點增加、更新、刪除時PathChildrenCache會改變它的狀態,會包含最新的子節點,子節點的資料和狀態

監視器可重複使用

8.1監視當前節點

@Test
    public void watcher1() throws Exception {
        //監視某個節點的資料變化
        NodeCache nodeCache = new NodeCache(client, "/watcher");
        //開啟監視器物件
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println(nodeCache.getCurrentData().getPath());
                System.out.println(new String(nodeCache.getCurrentData().getData()));
            }
        });
        Thread.sleep(60000);
        //關閉監視器物件
        nodeCache.close();
    }

首先該節點是沒有的,當我們在命令視窗執行:create /create/watcher "111" 後,控制檯輸出:

/watcher
111

當我們在控制檯再次執行:set /create/watcher "222" 後,控制檯輸出:

/watcher
222

8.2 監聽子節點

@Test
    public void watcher2() throws Exception {
        //監視某個子節點的資料變化
        //第三個引數為事件中是否可以獲取節點的資料
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher",true);
        //開啟監視器物件
        pathChildrenCache.start();
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
                throws Exception {
                System.out.println(pathChildrenCacheEvent.getType());
                System.out.println(pathChildrenCacheEvent.getData().getPath());
                System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
            }
        });
        Thread.sleep(60000);
        //關閉監視器物件
        pathChildrenCache.close();
    }

首先該節點是沒有的,當我們在命令視窗執行:create /create/watcher/watcher1 "11" 後,控制檯輸出:

CHILD_ADDED
/watcher/watcher1
11

當我們在控制檯再次執行:set /create/watcher/watcher1 "123" 後,控制檯輸出:

CHILD_UPDATED
/watcher/watcher1
123

當我們在控制檯再次執行:delete /create/watcher/watcher1 後,控制檯輸出:

CHILD_REMOVED
/watcher/watcher1
123

9.事務

//事務
    @Test
    public void tra1() throws Exception {
        //開啟事務
        client.inTransaction().create().forPath("/test1","test1".getBytes())
            .and()
            .setData().forPath("/notExistsNode","not exists".getBytes())
            .and()
            //提交事務
            .commit();
    }

10.分散式鎖

package com.cjian.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLock {
    private static CuratorFramework client;


    public static void main(String[] args) throws InterruptedException {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
            .sessionTimeoutMs(5000)
            .retryPolicy(retryPolicy)
            //可選項
            .namespace("lock")
            .build();
        client.start();

          //排他鎖
        InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
        //讀寫鎖
        //InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");

        Order order = new Order(0,interProcessLock);
        for (int i = 0; i < 5; i++) {
            new Thread(order).start();
        }
        Thread.sleep(10000);
        System.out.println(order.getNum());

    }
}
class  Order implements Runnable{
    private int num;
    private InterProcessLock interProcessLock;

    public Order(int num, InterProcessLock interProcessLock) {
        this.num = num;
        this.interProcessLock = interProcessLock;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                interProcessLock.acquire();
                num++;
                interProcessLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }
}