Zookeeper使用(二)--Java API
一、前言
上一篇部落格我們通過命令列來操作Zookeper的客戶端和服務端並進行相應的操作,這篇主要介紹如何通過API(JAVA)來操作Zookeeper。
二、開發環境配置
首先開啟Zookeeper服務端(上一篇部落格有具體的方法),方便客戶端連線。
配置開發環境環境可以有兩種方式:① 直接下載相關的依賴Jar包,然後在IDE中新增依賴 ② 建立maven專案,使用maven進行依賴管理。
① 手動新增依賴至IDE
步驟一:點選這裡下載對應版本的Jar包,包括(jar、javadoc.jar、sources.jar),筆者對應下載的Zookeeper3.4.6版本。
步驟二:開啟IDE(筆者使用eclispe),新建名為zookeeper_examples_none_maven的java專案。由於需要單獨新增依賴,為了方便管理,筆者在專案下新建了jar資料夾,用於存放本專案的jar包(將步驟一下載的3個jar包存放至此資料夾下)。
步驟三:在eclipse中新增依賴
步驟四:新建包、Java類進行測試
Zookeeper_Constructor_Usage_Simple.java
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; public class Zookeeper_Constructor_Usage_Simple implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); @Override public void process(WatchedEvent event) { System.out.println("Receive watched event : " + event); if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } public static void main(String[] args) throws IOException { ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Constructor_Usage_Simple()); System.out.println(zookeeper.getState()); try { connectedSemaphore.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Zookeeper session established"); } }
執行結果如下
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory at org.apache.zookeeper.ZooKeeper.<clinit>(ZooKeeper.java:94) at com.hust.grid.leesf.examples.Zookeeper_Constructor_Usage_Simple.main(Zookeeper_Constructor_Usage_Simple.java:23) Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 2 more
結果表明缺失LoggerFactory類,經筆者查閱資料,只需將zookeeper的lib資料夾下log4j-1.2.16.jar、slf4j-api-1.6.1.jar放如zookeeper_examples_none_maven的jar資料夾下,然後再次將其新增至IDE即可。
再次執行結果如下
CONNECTING
Receive watched event : WatchedEvent state:SyncConnected type:None path:null
Zookeeper session established
表示客戶端已經成功連線至伺服器了。
可以看到方法一相對而言比較麻煩,需要手動管理不同的依賴jar包,可以採用更成熟的依賴管理方法,即使用maven來管理Jar包。
② 使用maven管理依賴
步驟一:新建maven專案
步驟二:配置pom.xml檔案如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hust.grid.leesf</groupId>
<artifactId>zookeeper_examples</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>zookeeper_examples</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
</project>
步驟三:新建java類進行測試
Zookeeper_Constructor_Usage_Simple.java,程式碼同上。
執行結果如下
CONNECTING
Receive watched event : WatchedEvent state:SyncConnected type:None path:null
Zookeeper session established
結果也表示客戶端已經成功連線至伺服器。
三、操作示例
3.1 建立節點
建立節點有非同步和同步兩種方式。無論是非同步或者同步,Zookeeper都不支援遞迴呼叫,即無法在父節點不存在的情況下建立一個子節點,如在/zk-ephemeral節點不存在的情況下建立/zk-ephemeral/ch1節點;並且如果一個節點已經存在,那麼建立同名節點時,會丟擲NodeExistsException異常。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Zookeeper_Create_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Create_API_Sync_Usage());
System.out.println(zookeeper.getState());
connectedSemaphore.await();
String path1 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode: " + path1);
String path2 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
執行結果如下
CONNECTING
Success create znode: /zk-test-ephemeral-
Success create znode: /zk-test-ephemeral-0000000043
結果表明已經成功建立了臨時節點和臨時順序節點,在建立順序節點時,系統會在後面自動增加一串數字。
② 非同步方式
使用非同步方式於同步方式的區別在於節點的建立過程(包括網路通訊和服務端的節點建立過程)是非同步的,在同步介面呼叫過程中,開發者需要關注介面丟擲異常的可能,但是在非同步介面中,介面本身不會丟擲異常,所有異常都會在回撥函式中通過Result Code來體現。
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class Zookeeper_Create_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Create_API_ASync_Usage());
System.out.println(zookeeper.getState());
connectedSemaphore.await();
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), "I am context. ");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
class IStringCallback implements AsyncCallback.StringCallback {
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
}
}
執行結果如下
CONNECTING
Create path result: [0, /zk-test-ephemeral-, I am context. , real path name: /zk-test-ephemeral-
Create path result: [-110, /zk-test-ephemeral-, I am context. , real path name: null
Create path result: [0, /zk-test-ephemeral-, I am context. , real path name: /zk-test-ephemeral-0000000045
結果表明已經成功使用非同步方式建立了相應節點。
3.2 刪除節點
只允許刪除葉子節點,即一個節點如果有子節點,那麼該節點將無法直接刪除,必須先刪掉其所有子節點。同樣也有同步和非同步兩種方式。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Delete_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000,
new Delete_API_Sync_Usage());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
try {
zk.delete(path, -1);
} catch (Exception e) {
System.out.println("fail to delete znode: " + path);
}
zk.delete(path + "/c1", -1);
System.out.println("success delete znode: " + path + "/c1");
zk.delete(path, -1);
System.out.println("success delete znode: " + path);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
執行結果如下
success create znode: /zk-book
success create znode: /zk-book/c1
fail to delete znode: /zk-book
success delete znode: /zk-book/c1
success delete znode: /zk-book
結果表明若節點有子節點,則無法將其刪除,必須先刪除其所有子節點。
② 非同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class Delete_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000,
new Delete_API_ASync_Usage());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
zk.delete(path, -1, new IVoidCallback(), null);
zk.delete(path + "/c1", -1, new IVoidCallback(), null);
zk.delete(path, -1, new IVoidCallback(), null);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
class IVoidCallback implements AsyncCallback.VoidCallback {
public void processResult(int rc, String path, Object ctx) {
System.out.println(rc + ", " + path + ", " + ctx);
}
}
執行結果如下
success create znode: /zk-book
success create znode: /zk-book/c1
-111, /zk-book, null
0, /zk-book/c1, null
0, /zk-book, null
結果結果表明第一次刪除/zk-book的時異常,ResultCode為-111。
3.3 子節點獲取
讀取節點的子節點列表,同樣可以使用同步和非同步的方式進行操作。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class Zookeeper_GetChildren_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-book-1";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_GetChildren_API_Sync_Usage());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c1");
List<String> childrenList = zk.getChildren(path, true);
System.out.println(childrenList);
zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c2");
Thread.sleep(1000);
zk.create(path + "/c3", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c3");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
}
執行結果如下
success create znode: /zk-book-1
success create znode: /zk-book-1/c1
[c1]
success create znode: /zk-book-1/c2
ReGet Child:[c1, c2]
success create znode: /zk-book-1/c3
ReGet Child:[c3, c1, c2]
值得注意的是,Watcher通知是一次性的,即一旦觸發一次通知後,該Watcher就失效了,因此客戶端需要反覆註冊Watcher,即程式中在process裡面又註冊了Watcher,否則,將無法獲取c3節點的建立而導致子節點變化的事件。
② 非同步方式
package com.hust.grid.leesf.examples;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Zookeeper_GetChildren_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_GetChildren_API_ASync_Usage());
connectedSemaphore.await();
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c1");
zk.getChildren(path, true, new IChildren2Callback(), null);
zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/c2");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
}
class IChildren2Callback implements AsyncCallback.Children2Callback {
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: "
+ ctx + ", children list: " + children + ", stat: " + stat);
}
}
執行結果如下
success create znode: /zk-book
success create znode: /zk-book/c1
Get Children znode result: [response code: 0, param path: /zk-book, ctx: null, children list: [c1], stat: 2901,2901,1478226062843,1478226062843,0,1,0,0,0,1,2902
success create znode: /zk-book/c2
ReGet Child:[c1, c2]
結果表示通過非同步的方式可以獲取子節點資訊。
3.4 資料節點獲取
對於節點的資料獲取,同樣存在同步和非同步兩種方式。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class GetData_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000,
new GetData_API_Sync_Usage());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
System.out.println("the data of znode " + path + " is : " + new String(zk.getData(path, true, stat)));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
zk.setData(path, "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
try {
System.out.println("the data of znode " + event.getPath() + " is : " + new String(zk.getData(event.getPath(), true, stat)));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
} catch (Exception e) {
}
}
}
}
}
執行結果如下
success create znode: /zk-book
the data of /zk-book is : 123
czxID: 2924, mzxID: 2924, version: 0
the data of /zk-book is : 123
czxID: 2924, mzxID: 2925, version: 1
結果表明可以使用getData函式獲取節點的資料。
② 非同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class GetData_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000,
new GetData_API_ASync_Usage());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
zk.getData(path, true, new IDataCallback(), null);
zk.setData(path, "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
try {
zk.getData(event.getPath(), true, new IDataCallback(), null);
} catch (Exception e) {
}
}
}
}
}
class IDataCallback implements AsyncCallback.DataCallback {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc: " + rc + ", path: " + path + ", data: " + new String(data));
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
}
}
執行結果如下
success create znode: /zk-book
rc: 0, path: /zk-book, data: 123
czxID: 2932, mzxID: 2932, version: 0
rc: 0, path: /zk-book, data: 123
czxID: 2932, mzxID: 2933, version: 1
結果表明採用非同步方式同樣可方便獲取節點的資料。
3.5 更新資料
在更新資料時,setData方法存在一個version引數,其用於指定節點的資料版本,表明本次更新操作是針對指定的資料版本進行的,但是,在getData方法中,並沒有提供根據指定資料版本來獲取資料的介面,那麼,這裡為何要指定資料更新版本呢,這裡方便理解,可以等效於CAS(compare and swap),對於值V,每次更新之前都會比較其值是否是預期值A,只有符合預期,才會將V原子化地更新到新值B。Zookeeper的setData介面中的version引數可以對應預期值,表明是針對哪個資料版本進行更新,假如一個客戶端試圖進行更新操作,它會攜帶上次獲取到的version值進行更新,而如果這段時間內,Zookeeper伺服器上該節點的資料已經被其他客戶端更新,那麼其資料版本也會相應更新,而客戶端攜帶的version將無法匹配,無法更新成功,因此可以有效地避免分散式更新的併發問題。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class SetData_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new SetData_API_Sync_Usage());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
zk.getData(path, true, null);
Stat stat = zk.setData(path, "456".getBytes(), -1);
System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion());
Stat stat2 = zk.setData(path, "456".getBytes(), stat.getVersion());
System.out.println("czxID: " + stat2.getCzxid() + ", mzxID: " + stat2.getMzxid() + ", version: " + stat2.getVersion());
try {
zk.setData(path, "456".getBytes(), stat.getVersion());
} catch (KeeperException e) {
System.out.println("Error: " + e.code() + "," + e.getMessage());
}
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
執行結果如下
success create znode: /zk-book
czxID: 2936, mzxID: 2937, version: 1
czxID: 2936, mzxID: 2938, version: 2
Error: BADVERSION,KeeperErrorCode = BadVersion for /zk-book
結果表明由於攜帶的資料版本不正確,而無法成功更新節點。其中,setData中的version引數設定-1含義為客戶端需要基於資料的最新版本進行更新操作。
② 非同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class SetData_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000, new SetData_API_ASync_Usage());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path);
zk.setData(path, "456".getBytes(), -1, new IStatCallback(), null);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
class IStatCallback implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
執行結果如下
success create znode: /zk-book
rc: 0, path: /zk-book, stat: 2942,2943,1478228414526,1478228414545,1,0,0,96876700808708136,3,0,2942
rc(ResultCode)為0,表明成功更新節點資料。
3.6 檢測節點是否存在
在呼叫介面時註冊Watcher的話,還可以對節點是否存在進行監聽,一旦節點被建立、被刪除、資料更新,都會通知客戶端。
① 同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Exist_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000, //
new Exist_API_Sync_Usage());
connectedSemaphore.await();
zk.exists(path, true);
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData(path, "123".getBytes(), -1);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
zk.delete(path + "/c1", -1);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
try {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (EventType.NodeCreated == event.getType()) {
System.out.println("success create znode: " + event.getPath());
zk.exists(event.getPath(), true);
} else if (EventType.NodeDeleted == event.getType()) {
System.out.println("success delete znode: " + event.getPath());
zk.exists(event.getPath(), true);
} else if (EventType.NodeDataChanged == event.getType()) {
System.out.println("data changed of znode: " + event.getPath());
zk.exists(event.getPath(), true);
}
}
} catch (Exception e) {
}
}
}
執行結果如下
success create znode: /zk-book
data changed of znode: /zk-book
success create znode: /zk-book/c1
success delete znode: /zk-book
結果表明:
· 無論節點是否存在,都可以通過exists介面註冊Watcher。
· 註冊的Watcher,對節點建立、刪除、資料更新事件進行監聽。
· 對於指定節點的子節點的各種變化,不會通知客戶端。
② 非同步方式
package com.hust.grid.leesf.examples;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Exist_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk;
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("127.0.0.1:2181", 5000,
new Exist_API_ASync_Usage());
connectedSemaphore.await();
zk.exists(path, true, new IIStatCallback(), null);
zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.setData(path, "123".getBytes(), -1);
zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/c1");
zk.delete(path + "/c1", -1);
zk.delete(path, -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
try {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (EventType.NodeCreated == event.getType()) {
System.out.println("success create znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
} else if (EventType.NodeDeleted == event.getType()) {
System.out.println("success delete znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
} else if (EventType.NodeDataChanged == event.getType()) {
System.out.println("data changed of znode: " + event.getPath());
zk.exists(event.getPath(), true, new IIStatCallback(), null);
}
}
} catch (Exception e) {
}
}
}
class IIStatCallback implements AsyncCallback.StatCallback {
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat);
}
}
執行結果如下
rc: -101, path: /zk-book, stat: null
success create znode: /zk-book
rc: 0, path: /zk-book, stat: 2974,2974,1478229717889,1478229717889,0,0,0,0,0,0,2974
data changed of znode: /zk-book
rc: 0, path: /zk-book, stat: 2974,2975,1478229717889,1478229717922,1,0,0,0,3,0,2974
success create znode: /zk-book/c1
success delete znode: /zk-book
rc: -101, path: /zk-book, stat: null
結果表明當節點不存在時,其rc(ResultCode)為-101。
3.7 許可權控制
通過設定Zookeeper伺服器上資料節點的ACL控制,就可以對其客戶端對該資料節點的訪問許可權:如果符合ACL控制,則可以進行訪問,否則無法訪問。
① 使用無許可權資訊的Zookeeper會話訪問含許可權資訊的資料節點
package com.hust.grid.leesf.examples;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class AuthSample_Get {
final static String PATH = "/zk-book-auth_test";
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null);
zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + PATH);
ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null);
zookeeper2.getData(PATH, false, null);
}
}
執行結果如下
success create znode: /zk-book-auth_test
Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-book-auth_test
at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
at com.hust.grid.leesf.examples.AuthSample_Get.main(AuthSample_Get.java:17)
表示許可權不夠,不能進行操作。
② 刪除帶許可權控制的節點
package com.hust.grid.leesf.examples;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class AuthSample_Delete {
final static String PATH = "/zk-book-auth_test";
final static String PATH2 = "/zk-book-auth_test/child";
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null);
zookeeper1.addAuthInfo("digest", "foo:true".getBytes());
zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
zookeeper1.create(PATH2, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
try {
ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null);
zookeeper2.delete(PATH2, -1);
} catch (Exception e) {
System.out.println("fail to delete: " + e.getMessage());
}
ZooKeeper zookeeper3 = new ZooKeeper("127.0.0.1:2181", 5000, null);
zookeeper3.addAuthInfo("digest", "foo:true".getBytes());
zookeeper3.delete(PATH2, -1);
System.out.println("success delete znode: " + PATH2);
ZooKeeper zookeeper4 = new ZooKeeper("127.00.1:2181", 5000, null);
zookeeper4.delete(PATH, -1);
System.out.println("success delete znode: " + PATH);
}
}
執行結果如下
fail to delete: KeeperErrorCode = NoAuth for /zk-book-auth_test/child
success delete znode: /zk-book-auth_test/child
success delete znode: /zk-book-auth_test
結果表明若沒有許可權,則無法刪除節點。
四、總結
基於原生態的JAVA API的呼叫相對較簡單,筆者後續會對原始碼進行分析。本部落格的所有程式碼也同步上傳至github,也謝謝各位園友的觀看~