1. 程式人生 > 其它 >ZookeeperApi入門——增刪改查

ZookeeperApi入門——增刪改查

技術標籤:zookeeperzookeeperjava

目錄

1.建立節點

1.1 world模式

1.2 IP模式

1.3 auth模式

1.4 digest模式

1.5 持久化順序節點

1.6 非同步建立

2.更新操作

2.1 同步方式

2.2非同步方式

3.刪除操作

3.1同步方式

3.2非同步方式

4.檢視節點

4.1同步方式

4.2非同步方式

5.獲取子節點

5.1同步方式

5.2非同步方式

6.判斷節點是否存在

6.1同步方式

6.2非同步方式


匯入依賴:

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.12</version>
</dependency>
<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
</dependency>

<dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-core</artifactId>
            <version>1.3</version>
 </dependency>

測試demo:

 @Before
    public void before() {
        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            /**
             * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
             * connectString:zookeeper主機
             * sessionTimeout:會話超時時間,單位毫秒
             * watcher:實現監視器物件,zookeeper通過監視器物件返回連線狀態
             */
            zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        System.out.println("zookeeper連線成功!");
                        countDownLatch.countDown();
                    }

                }
            });
            //主執行緒阻塞等待連線物件的建立成功
            countDownLatch.await();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @After
    public void after() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
                System.out.println("關閉zookeeper!");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  • 連線到zookeeper伺服器。zookeeper伺服器為客戶端分配會話ID;
  • 定期向伺服器傳送心跳,否則,zookeeper伺服器將過期會話ID,客戶端需要重新連線
  • 只要會話ID處於活動狀態,就可以獲取/設定znode
  • 所有任務完成後,斷開連線。

1.建立節點

同步方式:create(String path, byte[] data, List<ACL> acl, CreateMode createMode)

非同步方式:create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

  • path:znode路徑
  • data:要儲存的資料
  • acl:要建立的節點的訪問控制列表,zookeeperApi提供了一個靜態介面ZooDefs.Ids來獲取一些基本的acl列表。
  • createMode:節點的型別,列舉。
  • callback:非同步回撥介面
  • ctx:上下文引數

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.zookeeper;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.yetus.audience.InterfaceAudience.Public;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

@Public
public class ZooDefs {
    public static final String[] opNames = new String[]{"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping"};

    public ZooDefs() {
    }

    @Public
    public interface Ids {
        Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
        Id AUTH_IDS = new Id("auth", "");
        @SuppressFBWarnings(
            value = {"MS_MUTABLE_COLLECTION"},
            justification = "Cannot break API"
        )
        ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
        @SuppressFBWarnings(
            value = {"MS_MUTABLE_COLLECTION"},
            justification = "Cannot break API"
        )
        ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
        @SuppressFBWarnings(
            value = {"MS_MUTABLE_COLLECTION"},
            justification = "Cannot break API"
        )
        ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
    }

    @Public
    public interface Perms {
        int READ = 1;
        int WRITE = 2;
        int CREATE = 4;
        int DELETE = 8;
        int ADMIN = 16;
        int ALL = 31;
    }

    @Public
    public interface OpCode {
        int notification = 0;
        int create = 1;
        int delete = 2;
        int exists = 3;
        int getData = 4;
        int setData = 5;
        int getACL = 6;
        int setACL = 7;
        int getChildren = 8;
        int sync = 9;
        int ping = 11;
        int getChildren2 = 12;
        int check = 13;
        int multi = 14;
        int auth = 100;
        int setWatches = 101;
        int sasl = 102;
        int createSession = -10;
        int closeSession = -11;
        int error = -1;
    }
}

1.1 world模式

@Test
    public void create1() throws KeeperException, InterruptedException {
        /**
         * 1.節點的路徑
         * 2.節點的資料
         * 3.許可權列表   ZooDefs.Ids.OPEN_ACL_UNSAFE ->  world:anyone:cdrwa
         * 4.節點型別 持久化節點
         *
         * 建立節點的時候如果父節點路徑不存在,會報異常,如下圖
         */
        zooKeeper.create("/create", "create".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zooKeeper.create("/create/node1", "node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

建立成功後,我們來通過命令視窗檢視下:

[zk: localhost:2181(CONNECTED) 5] get /create/node1
node1
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x76
mtime = Fri Jan 29 10:24:23 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
[zk: localhost:2181(CONNECTED) 6] getAcl /create/node1
'world,'anyone
: cdrwa

只讀:

@Test
    public void create2() throws KeeperException, InterruptedException {
        zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

命令視窗檢視

[zk: localhost:2181(CONNECTED) 7] getAcl /create/node2
'world,'anyone
: r
[zk: localhost:2181(CONNECTED) 8] get /create/node2
node2
cZxid = 0x79
ctime = Fri Jan 29 10:30:23 CST 2021
mZxid = 0x79
mtime = Fri Jan 29 10:30:23 CST 2021
pZxid = 0x79
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

靈活設定:

 // world:anyone:acl 模式
    @Test
    public void create3() throws KeeperException, InterruptedException {
        //許可權列表
        List<ACL> acls = new ArrayList<>();
        Id id = new Id("world", "anyone");
        //許可權設定
        acls.add(new ACL(ZooDefs.Perms.READ, id));
        acls.add(new ACL(ZooDefs.Perms.WRITE, id));
        zooKeeper.create("/create/node3","node3".getBytes(),acls, CreateMode.PERSISTENT);
    }
[zk: localhost:2181(CONNECTED) 9] getAcl /create/node3
'world,'anyone
: r
'world,'anyone
: w

1.2 IP模式

 @Test
    public void create4() throws KeeperException, InterruptedException {
        //許可權列表
        List<ACL> acls = new ArrayList<>();
        Id id = new Id("ip", "127.0.0.1");
        //許可權設定
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/create/node4","node4".getBytes(),acls, CreateMode.PERSISTENT);
    }
[zk: localhost:2181(CONNECTED) 10] getAcl /create/node4
'ip,'127.0.0.1
: cdrwa

這裡也可以有靈活配置,不測了~

1.3 auth模式

全部的許可權

 //auth模式
    @Test
    public void create5() throws KeeperException, InterruptedException {
        //新增授權使用者
        zooKeeper.addAuthInfo("digest", "cjian:111111".getBytes());
        zooKeeper.create("/create/node5","node5".getBytes(),ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    }
[zk: localhost:2181(CONNECTED) 1] getAcl /create/node5
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: cdrwa
[zk: localhost:2181(CONNECTED) 2] get /create/node5
Authentication is not valid : /create/node5
[zk: localhost:2181(CONNECTED) 3] addauth digest cjian:111111
[zk: localhost:2181(CONNECTED) 4] get /create/node5
node5
cZxid = 0x87
ctime = Fri Jan 29 11:22:39 CST 2021
mZxid = 0x87
mtime = Fri Jan 29 11:22:39 CST 2021
pZxid = 0x87
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

靈活配置:

 @Test
    public void create6() throws KeeperException, InterruptedException {
        //新增授權使用者
        zooKeeper.addAuthInfo("digest", "cjian:111111".getBytes());
        //許可權列表
        List<ACL> acls = new ArrayList<>();
        Id id = new Id("auth", "cjian");
        //許可權設定
        acls.add(new ACL(ZooDefs.Perms.READ, id));

        zooKeeper.create("/create/node6","node6".getBytes(), acls, CreateMode.PERSISTENT);
    }
[zk: localhost:2181(CONNECTED) 9] getAcl /create/node6
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: r

1.4 digest模式

//digest模式
    @Test
    public void create7() throws KeeperException, InterruptedException {
        //許可權列表
        List<ACL> acls = new ArrayList<>();
        Id id = new Id("digest", "cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=");
        //許可權設定
        acls.add(new ACL(ZooDefs.Perms.ALL, id));
        zooKeeper.create("/create/node7","node7".getBytes(),acls, CreateMode.PERSISTENT);
    }
[zk: localhost:2181(CONNECTED) 11] getAcl /create/node7
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: cdrwa

1.5 持久化順序節點

  @Test
    public void create8() throws KeeperException, InterruptedException {
        String returnValue = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(returnValue);
    }

返回了建立的節點路徑:

zookeeper連線成功!
/create/node80000000008
關閉zookeeper!

其他節點型別不再嘗試...

1.6 非同步建立

上面的建立都是同步的方式,我們來看下非同步的方式

//非同步建立
    @Test
    public void create9() throws KeeperException, InterruptedException {
        zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
            new AsyncCallback.StringCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx, String name) {
                    //0代表建立成功
                    System.out.println(rc);
                    //節點的路徑
                    System.out.println(path);
                    //節點的路徑
                    System.out.println(name);
                    //上下文引數
                    System.out.println(ctx);
                }
            }, "context...");
        Thread.sleep(1000);
        System.out.println("建立結束!");

    }
zookeeper連線成功!
0
/create/node9
/create/node9
context...
建立結束!
關閉zookeeper!

2.更新操作

同步方式:setData(String path, byte[] data, int version)

非同步方式:setData(String path, byte[] data, int version, StatCallback cb, Object ctx)

2.1 同步方式

更新前的值:

[zk: localhost:2181(CONNECTED) 4] get /create/node1
node1
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x76
mtime = Fri Jan 29 10:24:23 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
 @Test
    public void set1() throws KeeperException, InterruptedException {
        /**
         * version=-1時,表示版本號不參與更新
         */
        Stat stat = zooKeeper.setData("/create/node1", "node111".getBytes(), -1);
        System.out.println(stat.getVersion());
    }

更新後:

[zk: localhost:2181(CONNECTED) 5] get /create/node1
node111
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x9e
mtime = Fri Jan 29 14:07:09 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

控制檯:

zookeeper連線成功!
1
關閉zookeeper!

2.2非同步方式

 @Test
    public void set2() throws KeeperException, InterruptedException {
        /**
         * version=-1時,表示版本號不參與更新
         */
        zooKeeper.setData("/create/node1", "node222".getBytes(), 1, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object o, Stat stat) {
                //0代表更新成功
                System.out.println(rc);
                //節點的路徑
                System.out.println(path);
                //上下文引數
                System.out.println(o);
                //屬性描述物件
                System.out.println(stat.getVersion());
            }
        },"context");
        Thread.sleep(1000);
        System.out.println("更新結束!");
    }

控制檯輸出:

zookeeper連線成功!
0
/create/node1
context
2
更新結束!
關閉zookeeper!

3.刪除操作

同步方式:delete(String path, int version)

非同步方式:delete(String path, int version, VoidCallback cb, Object ctx)

3.1同步方式

@Test
    public void delete1() throws KeeperException, InterruptedException {
        zooKeeper.delete("/create/node1", -1);
    }

3.2非同步方式

 @Test
    public void delete2() throws KeeperException, InterruptedException {
        zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx) {
                //0表示的刪除成功
                System.out.println(rc);
                System.out.println(path);
                System.out.println(ctx);

            }
        }, "context");
        Thread.sleep(1000);
        System.out.println("刪除結束!");
    }

4.檢視節點

同步方式:getData(String path, boolean watch, Stat stat)

非同步方式:getData(String path, Watcher watcher, DataCallback cb, Object ctx)

這裡以及後面的 watch引數暫時不說,放到後面文章的監視器裡說

4.1同步方式

 @Test
    public void getData1() throws KeeperException, InterruptedException {
        Stat stat = new Stat();
        byte[] str = zooKeeper.getData("/create/node3",false,stat);
        System.out.println(new String(str));
        System.out.println(stat.getVersion());
    }
zookeeper連線成功!
node3
0
關閉zookeeper!

4.2非同步方式

 @Test
    public void getData2() throws KeeperException, InterruptedException {
         zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
                 //0代表讀取成功
                 System.out.println(rc);
                 //節點的路徑
                 System.out.println(path);
                 //上下文引數物件
                 System.out.println(path);
                 //資料
                 System.out.println(new String(bytes));
                 //屬性物件
                 System.out.println(stat.getVersion());
             }
         },"context");
        //Thread.sleep(1000);
        System.out.println("讀取結束!");
    }
zookeeper連線成功!
0
/create/node3
/create/node3
node3
0
讀取結束!
關閉zookeeper!

5.獲取子節點

同步方式:getChildren(String path, boolean watch)

非同步方式:getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)

5.1同步方式

 @Test
    public void getChildren1() throws KeeperException, InterruptedException {
        List<String> children = zooKeeper.getChildren("/create", false);
        for (String child : children) {
            System.out.println(child);
        }
    }
zookeeper連線成功!
node4
node5
node3
node9
node6
node7
關閉zookeeper!

5.2非同步方式

@Test
    public void getChildren2() throws KeeperException, InterruptedException {
        zooKeeper.getChildren("/create", false, new AsyncCallback.ChildrenCallback() {
            @Override
            public void processResult(int rc, String path, Object o, List<String> list) {
                System.out.println(rc);
                System.out.println(path);
                System.out.println(o);
                for (String child : list) {
                    System.out.println(child);
                }
            }
        }, "context");
        Thread.sleep(1000);
        System.out.println("讀取結束!");

    }
zookeeper連線成功!
0
/create
context
node4
node5
node3
node9
node6
node7
讀取結束!
關閉zookeeper!

6.判斷節點是否存在

同步方式:exists(String path, boolean watch)

非同步方式:exists(String path, boolean watch, StatCallback cb, Object ctx)

6.1同步方式

@Test
    public void exist1() throws KeeperException, InterruptedException {
        Stat stat1 = zooKeeper.exists("/hahah", false);
        System.out.println("stat1:"+stat1);

        Stat stat2 = zooKeeper.exists("/create", false);
        System.out.println("stat2:"+stat2);
    }
zookeeper連線成功!
stat1:null
stat2:117,117,1611887063802,1611887063802,0,18,0,0,6,6,226

關閉zookeeper!

6.2非同步方式

@Test
    public void exist2() throws KeeperException, InterruptedException {
        zooKeeper.exists("/create", false, new AsyncCallback.StatCallback() {
            @Override
            public void processResult(int rc, String path, Object o, Stat stat) {
                System.out.println(rc);
                System.out.println(path);
                System.out.println(o);
                System.out.println(stat.getVersion());
            }
        }, "context");
        Thread.sleep(1000);
        System.out.println("讀取結束!");
    }
zookeeper連線成功!
0
/create
context
0
讀取結束!
關閉zookeeper!