ZookeeperApi入門——增刪改查
目錄
匯入依賴:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>1.3</version> </dependency>
測試demo:
@Before public void before() { try { CountDownLatch countDownLatch = new CountDownLatch(1); /** * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) * connectString:zookeeper主機 * sessionTimeout:會話超時時間,單位毫秒 * watcher:實現監視器物件,zookeeper通過監視器物件返回連線狀態 */ zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("zookeeper連線成功!"); countDownLatch.countDown(); } } }); //主執行緒阻塞等待連線物件的建立成功 countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } } @After public void after() { try { if (zooKeeper != null) { zooKeeper.close(); System.out.println("關閉zookeeper!"); } } catch (InterruptedException e) { e.printStackTrace(); } }
- 連線到zookeeper伺服器。zookeeper伺服器為客戶端分配會話ID;
- 定期向伺服器傳送心跳,否則,zookeeper伺服器將過期會話ID,客戶端需要重新連線
- 只要會話ID處於活動狀態,就可以獲取/設定znode
- 所有任務完成後,斷開連線。
1.建立節點
同步方式:create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
非同步方式:create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
- path:znode路徑
- data:要儲存的資料
- acl:要建立的節點的訪問控制列表,zookeeperApi提供了一個靜態介面ZooDefs.Ids來獲取一些基本的acl列表。
- createMode:節點的型別,列舉。
- callback:非同步回撥介面
- ctx:上下文引數
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.zookeeper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.yetus.audience.InterfaceAudience.Public;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@Public
public class ZooDefs {
public static final String[] opNames = new String[]{"notification", "create", "delete", "exists", "getData", "setData", "getACL", "setACL", "getChildren", "getChildren2", "getMaxChildren", "setMaxChildren", "ping"};
public ZooDefs() {
}
@Public
public interface Ids {
Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
Id AUTH_IDS = new Id("auth", "");
@SuppressFBWarnings(
value = {"MS_MUTABLE_COLLECTION"},
justification = "Cannot break API"
)
ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
@SuppressFBWarnings(
value = {"MS_MUTABLE_COLLECTION"},
justification = "Cannot break API"
)
ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
@SuppressFBWarnings(
value = {"MS_MUTABLE_COLLECTION"},
justification = "Cannot break API"
)
ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
}
@Public
public interface Perms {
int READ = 1;
int WRITE = 2;
int CREATE = 4;
int DELETE = 8;
int ADMIN = 16;
int ALL = 31;
}
@Public
public interface OpCode {
int notification = 0;
int create = 1;
int delete = 2;
int exists = 3;
int getData = 4;
int setData = 5;
int getACL = 6;
int setACL = 7;
int getChildren = 8;
int sync = 9;
int ping = 11;
int getChildren2 = 12;
int check = 13;
int multi = 14;
int auth = 100;
int setWatches = 101;
int sasl = 102;
int createSession = -10;
int closeSession = -11;
int error = -1;
}
}
1.1 world模式
@Test
public void create1() throws KeeperException, InterruptedException {
/**
* 1.節點的路徑
* 2.節點的資料
* 3.許可權列表 ZooDefs.Ids.OPEN_ACL_UNSAFE -> world:anyone:cdrwa
* 4.節點型別 持久化節點
*
* 建立節點的時候如果父節點路徑不存在,會報異常,如下圖
*/
zooKeeper.create("/create", "create".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/create/node1", "node1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
建立成功後,我們來通過命令視窗檢視下:
[zk: localhost:2181(CONNECTED) 5] get /create/node1
node1
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x76
mtime = Fri Jan 29 10:24:23 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
[zk: localhost:2181(CONNECTED) 6] getAcl /create/node1
'world,'anyone
: cdrwa
只讀:
@Test
public void create2() throws KeeperException, InterruptedException {
zooKeeper.create("/create/node2", "node2".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
}
命令視窗檢視
[zk: localhost:2181(CONNECTED) 7] getAcl /create/node2
'world,'anyone
: r
[zk: localhost:2181(CONNECTED) 8] get /create/node2
node2
cZxid = 0x79
ctime = Fri Jan 29 10:30:23 CST 2021
mZxid = 0x79
mtime = Fri Jan 29 10:30:23 CST 2021
pZxid = 0x79
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
靈活設定:
// world:anyone:acl 模式
@Test
public void create3() throws KeeperException, InterruptedException {
//許可權列表
List<ACL> acls = new ArrayList<>();
Id id = new Id("world", "anyone");
//許可權設定
acls.add(new ACL(ZooDefs.Perms.READ, id));
acls.add(new ACL(ZooDefs.Perms.WRITE, id));
zooKeeper.create("/create/node3","node3".getBytes(),acls, CreateMode.PERSISTENT);
}
[zk: localhost:2181(CONNECTED) 9] getAcl /create/node3
'world,'anyone
: r
'world,'anyone
: w
1.2 IP模式
@Test
public void create4() throws KeeperException, InterruptedException {
//許可權列表
List<ACL> acls = new ArrayList<>();
Id id = new Id("ip", "127.0.0.1");
//許可權設定
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node4","node4".getBytes(),acls, CreateMode.PERSISTENT);
}
[zk: localhost:2181(CONNECTED) 10] getAcl /create/node4
'ip,'127.0.0.1
: cdrwa
這裡也可以有靈活配置,不測了~
1.3 auth模式
全部的許可權
//auth模式
@Test
public void create5() throws KeeperException, InterruptedException {
//新增授權使用者
zooKeeper.addAuthInfo("digest", "cjian:111111".getBytes());
zooKeeper.create("/create/node5","node5".getBytes(),ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
}
[zk: localhost:2181(CONNECTED) 1] getAcl /create/node5
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: cdrwa
[zk: localhost:2181(CONNECTED) 2] get /create/node5
Authentication is not valid : /create/node5
[zk: localhost:2181(CONNECTED) 3] addauth digest cjian:111111
[zk: localhost:2181(CONNECTED) 4] get /create/node5
node5
cZxid = 0x87
ctime = Fri Jan 29 11:22:39 CST 2021
mZxid = 0x87
mtime = Fri Jan 29 11:22:39 CST 2021
pZxid = 0x87
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
靈活配置:
@Test
public void create6() throws KeeperException, InterruptedException {
//新增授權使用者
zooKeeper.addAuthInfo("digest", "cjian:111111".getBytes());
//許可權列表
List<ACL> acls = new ArrayList<>();
Id id = new Id("auth", "cjian");
//許可權設定
acls.add(new ACL(ZooDefs.Perms.READ, id));
zooKeeper.create("/create/node6","node6".getBytes(), acls, CreateMode.PERSISTENT);
}
[zk: localhost:2181(CONNECTED) 9] getAcl /create/node6
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: r
1.4 digest模式
//digest模式
@Test
public void create7() throws KeeperException, InterruptedException {
//許可權列表
List<ACL> acls = new ArrayList<>();
Id id = new Id("digest", "cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=");
//許可權設定
acls.add(new ACL(ZooDefs.Perms.ALL, id));
zooKeeper.create("/create/node7","node7".getBytes(),acls, CreateMode.PERSISTENT);
}
[zk: localhost:2181(CONNECTED) 11] getAcl /create/node7
'digest,'cjian:ST25maIOy8WImjpR7Nf2/D2wFXQ=
: cdrwa
1.5 持久化順序節點
@Test
public void create8() throws KeeperException, InterruptedException {
String returnValue = zooKeeper.create("/create/node8", "node8".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(returnValue);
}
返回了建立的節點路徑:
zookeeper連線成功!
/create/node80000000008
關閉zookeeper!
其他節點型別不再嘗試...
1.6 非同步建立
上面的建立都是同步的方式,我們來看下非同步的方式
//非同步建立
@Test
public void create9() throws KeeperException, InterruptedException {
zooKeeper.create("/create/node9", "node9".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
//0代表建立成功
System.out.println(rc);
//節點的路徑
System.out.println(path);
//節點的路徑
System.out.println(name);
//上下文引數
System.out.println(ctx);
}
}, "context...");
Thread.sleep(1000);
System.out.println("建立結束!");
}
zookeeper連線成功!
0
/create/node9
/create/node9
context...
建立結束!
關閉zookeeper!
2.更新操作
同步方式:setData(String path, byte[] data, int version) 非同步方式:setData(String path, byte[] data, int version, StatCallback cb, Object ctx)
2.1 同步方式
更新前的值:
[zk: localhost:2181(CONNECTED) 4] get /create/node1
node1
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x76
mtime = Fri Jan 29 10:24:23 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
@Test
public void set1() throws KeeperException, InterruptedException {
/**
* version=-1時,表示版本號不參與更新
*/
Stat stat = zooKeeper.setData("/create/node1", "node111".getBytes(), -1);
System.out.println(stat.getVersion());
}
更新後:
[zk: localhost:2181(CONNECTED) 5] get /create/node1
node111
cZxid = 0x76
ctime = Fri Jan 29 10:24:23 CST 2021
mZxid = 0x9e
mtime = Fri Jan 29 14:07:09 CST 2021
pZxid = 0x76
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
控制檯:
zookeeper連線成功!
1
關閉zookeeper!
2.2非同步方式
@Test
public void set2() throws KeeperException, InterruptedException {
/**
* version=-1時,表示版本號不參與更新
*/
zooKeeper.setData("/create/node1", "node222".getBytes(), 1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object o, Stat stat) {
//0代表更新成功
System.out.println(rc);
//節點的路徑
System.out.println(path);
//上下文引數
System.out.println(o);
//屬性描述物件
System.out.println(stat.getVersion());
}
},"context");
Thread.sleep(1000);
System.out.println("更新結束!");
}
控制檯輸出:
zookeeper連線成功!
0
/create/node1
context
2
更新結束!
關閉zookeeper!
3.刪除操作
同步方式:delete(String path, int version)
非同步方式:delete(String path, int version, VoidCallback cb, Object ctx)
3.1同步方式
@Test
public void delete1() throws KeeperException, InterruptedException {
zooKeeper.delete("/create/node1", -1);
}
3.2非同步方式
@Test
public void delete2() throws KeeperException, InterruptedException {
zooKeeper.delete("/create/node2", -1, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
//0表示的刪除成功
System.out.println(rc);
System.out.println(path);
System.out.println(ctx);
}
}, "context");
Thread.sleep(1000);
System.out.println("刪除結束!");
}
4.檢視節點
同步方式:getData(String path, boolean watch, Stat stat)
非同步方式:getData(String path, Watcher watcher, DataCallback cb, Object ctx)
這裡以及後面的 watch引數暫時不說,放到後面文章的監視器裡說
4.1同步方式
@Test
public void getData1() throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] str = zooKeeper.getData("/create/node3",false,stat);
System.out.println(new String(str));
System.out.println(stat.getVersion());
}
zookeeper連線成功!
node3
0
關閉zookeeper!
4.2非同步方式
@Test
public void getData2() throws KeeperException, InterruptedException {
zooKeeper.getData("/create/node3", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
//0代表讀取成功
System.out.println(rc);
//節點的路徑
System.out.println(path);
//上下文引數物件
System.out.println(path);
//資料
System.out.println(new String(bytes));
//屬性物件
System.out.println(stat.getVersion());
}
},"context");
//Thread.sleep(1000);
System.out.println("讀取結束!");
}
zookeeper連線成功!
0
/create/node3
/create/node3
node3
0
讀取結束!
關閉zookeeper!
5.獲取子節點
同步方式:getChildren(String path, boolean watch)
非同步方式:getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
5.1同步方式
@Test
public void getChildren1() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren("/create", false);
for (String child : children) {
System.out.println(child);
}
}
zookeeper連線成功!
node4
node5
node3
node9
node6
node7
關閉zookeeper!
5.2非同步方式
@Test
public void getChildren2() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/create", false, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object o, List<String> list) {
System.out.println(rc);
System.out.println(path);
System.out.println(o);
for (String child : list) {
System.out.println(child);
}
}
}, "context");
Thread.sleep(1000);
System.out.println("讀取結束!");
}
zookeeper連線成功!
0
/create
context
node4
node5
node3
node9
node6
node7
讀取結束!
關閉zookeeper!
6.判斷節點是否存在
同步方式:exists(String path, boolean watch)
非同步方式:exists(String path, boolean watch, StatCallback cb, Object ctx)
6.1同步方式
@Test
public void exist1() throws KeeperException, InterruptedException {
Stat stat1 = zooKeeper.exists("/hahah", false);
System.out.println("stat1:"+stat1);
Stat stat2 = zooKeeper.exists("/create", false);
System.out.println("stat2:"+stat2);
}
zookeeper連線成功!
stat1:null
stat2:117,117,1611887063802,1611887063802,0,18,0,0,6,6,226
關閉zookeeper!
6.2非同步方式
@Test
public void exist2() throws KeeperException, InterruptedException {
zooKeeper.exists("/create", false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object o, Stat stat) {
System.out.println(rc);
System.out.println(path);
System.out.println(o);
System.out.println(stat.getVersion());
}
}, "context");
Thread.sleep(1000);
System.out.println("讀取結束!");
}
zookeeper連線成功!
0
/create
context
0
讀取結束!
關閉zookeeper!