ZooKeeper Java客戶端
阿新 • • 發佈:2021-06-25
ZooKeeper Java客戶端。
這裡介紹兩個客戶端,一個為 ZooKeeper 原生客戶端,一個為 ZkClient。
首先建立一個maven專案,pom檔案中新增依賴。
<!-- 原生 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <!-- zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
一、ZooKeeper原生客戶端
之前安裝過程看到 ZooKeeper 依賴 jdk 環境,可以看出它的原始碼使用了Java語言基礎,學習原生客戶端對於以後看原始碼有幫助,接下來看一看使用方式。
1. 建立會話
/** * @Author SunnyBear * @Description 建立Session */ public class TestCreateSession { /** * ZooKeeper服務地址 */ private final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; /** * 會話超時時間 */ private static final int SESSION_TIMEOUT = 30000; public static void main(String[] args) throws IOException, InterruptedException { new TestCreateSession().testSession1(); System.out.println("--------------------------華麗的分割線--------------------------"); new TestCreateSession().testSession2(); } /** * 獲得session的方式,這種方式可能會在ZooKeeper還沒有獲得連線的時候就已經對ZK進行訪問了 * 測試可以發現連線狀態為CONNECTING,而不是CONNECTED */ public void testSession1() throws IOException { ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, null); System.out.println("zooKeeper: " + zooKeeper); System.out.println("zooKeeper.getState(): " + zooKeeper.getState()); } /** * 發令槍 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 使用發令槍對獲得Session的方式進行優化,在ZooKeeper初始化完成以前先等待,等待完成後再進行後續操作 */ public void testSession2() throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { // 狀態為已連線時才進行後續操作 connectedSemaphore.countDown(); System.out.println("狀態為已連線。。"); } } }); connectedSemaphore.await(); System.out.println("zooKeeper: " + zooKeeper); System.out.println("zooKeeper.getState(): " + zooKeeper.getState()); } }
2. 基本操作
/** * @Author SunnyBear * @Description 基本操作 */ public class TestJavaApi implements Watcher { private static final int SESSION_TIMEOUT = 10000; private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181"; private static final String ZK_PATH = "/SunnyBearApiTest"; private static final String ZK_DATA = "我是SunnyBearApiTest的資料"; private ZooKeeper zk = null; private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立連線 * @param server zk伺服器地址列表 * @param sessionTimeout session超時時間 */ public void createConnection(String server, int sessionTimeout) { try { zk = new ZooKeeper(server, sessionTimeout, this); connectedSemaphore.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 收到來自Server的Watcher通知,然後呼叫的方法處理 */ public void process(WatchedEvent watchedEvent) { System.out.println("收到事件通知:" + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { connectedSemaphore.countDown(); } } /** * 關閉連線 */ public void releaseConnection() { if (this.zk != null) { try { this.zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 建立節點 * @param path 節點path * @param data 初始資料內容 * @return 是否建立成功 */ public boolean createPath(String path, String data) { try { /** * ZooDefs.Ids.OPEN_ACL_UNSAFE:節點許可權 * CreateMode.EPHEMERAL:節點型別為臨時節點 */ String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("建立節點path:" + createPath); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 讀取指定節點資料 * @param path 節點path * @return 節點內容 */ public String readData(String path) { try { String data = new String(this.zk.getData(path, false, null)); System.out.println("讀取資料成功path:" + path + ",data:" + data); return data; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return ""; } /** * 更新指定節點資料 * @param path 節點path * @param data 資料data * @return 是否成功 */ public boolean writeData(String path, String data) { try { System.out.println("更新資料成功,path:" + path + ", stat: " + this.zk.setData(path, data.getBytes(), -1)); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 刪除節點 * @param path 節點path * @return 是否成功 */ public boolean deleteNode(String path) { try { this.zk.delete(path, -1); System.out.println("刪除節點成功,path:" + path); return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static void main(String[] args) { TestJavaApi testJavaApi = new TestJavaApi(); testJavaApi.createConnection(SERVER, SESSION_TIMEOUT); if (testJavaApi.createPath(ZK_PATH, ZK_DATA)) { System.out.println("建立後資料內容:" + testJavaApi.readData(ZK_PATH)); testJavaApi.writeData(ZK_PATH, "我是SunnyBearApiTest修改後的資料"); System.out.println("更新後資料內容:" + testJavaApi.readData(ZK_PATH)); testJavaApi.deleteNode(ZK_PATH); } testJavaApi.releaseConnection(); } }
3. 監聽機制
Zookeeper採用了Watcher機制實現資料的釋出/訂閱功能。該機制在被訂閱物件發生變化時會非同步通知客戶端,因此客戶端不必在Watcher註冊後輪詢阻塞,從而減輕了客戶端壓力,Watcher是一次性的,如果被觸發了需要重新註冊。
/**
* @Author SunnyBear
* @Description 監聽機制
*/
public class TestWatcher implements Watcher {
private AtomicInteger seq = new AtomicInteger();
private static final int SESSION_TIMEOUT = 100000;
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final String PARENT_PATH = "/testWatch";
private static final String CHILDREN_PATH = "/testWatch/children";
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zk = null;
/**
* 建立連線
* @param server zk伺服器地址列表
* @param sessionTimeout session超時時間
*/
public void createConnection(String server, int sessionTimeout) {
try {
zk = new ZooKeeper(server, sessionTimeout, this);
connectedSemaphore.await();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 關閉連線
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 建立節點
* @param path 節點path
* @param data 初始資料內容
* @return 是否建立成功
*/
public boolean createPath(String path, String data) {
try {
// 設定監控(由於zookeeper的監控都是一次性的所以 每次必須設定監控)
this.zk.exists(path, true);
// 建立持久節點
String createPath = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("建立節點path:" + createPath);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 讀取指定節點資料
* @param path 節點path
* @param needWatch 表示是否需要註冊一個watcher。true:註冊預設watcher,false:不需要註冊watcher
* @return 節點內容
*/
public String readData(String path, boolean needWatch) {
try {
String data = new String(this.zk.getData(path, needWatch, null));
System.out.println("讀取資料成功path:" + path + ",data:" + data);
return data;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
/**
* 更新指定節點資料
* @param path 節點path
* @param data 資料data
* @return 是否成功
*/
public boolean writeData(String path, String data) {
try {
System.out.println("更新資料成功,path:" + path + ", stat: "
+ this.zk.setData(path, data.getBytes(), -1));
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 刪除節點
* @param path 節點path
* @return 是否成功
*/
public boolean deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println("刪除節點成功,path:" + path);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 判斷指定節點是否存在
* @param path 節點路徑
* @param needWatch 表示是否需要註冊一個watcher
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取子節點
* @param path 節點路徑
* @param needWatch 表示是否需要註冊一個watcher
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 刪除測試用的所有節點
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
* 收到來自Server的Watcher通知,然後呼叫的方法處理
*/
public void process(WatchedEvent watchedEvent) {
System.out.println("收到事件通知:" + watchedEvent);
try {
Thread.sleep(200);
if (watchedEvent == null) {
return;
}
// 連線狀態
Event.KeeperState eventState = watchedEvent.getState();
// 事件型別
Event.EventType eventType = watchedEvent.getType();
// 受影響的path
String eventPath = watchedEvent.getPath();
// 列印一下監聽的資訊
String eventPrefix = "[Watch - " + this.seq.incrementAndGet() + "] ";
System.out.println(eventPrefix + "收到Watcher通知");
System.out.println(eventPrefix + "連線狀態:\t" + eventState.toString());
System.out.println(eventPrefix + "事件型別:\t" + eventType.toString());
if (Event.KeeperState.SyncConnected == eventState) {
switch (eventType) {
case None:
System.out.println(eventPrefix + "成功連線zk伺服器");
connectedSemaphore.countDown();
break;
case NodeCreated:
System.out.println(eventPrefix + "節點建立");
Thread.sleep(200);
this.zk.exists(eventPath, true);
break;
case NodeDataChanged:
System.out.println(eventPrefix + "節點資料更新");
Thread.sleep(200);
System.out.println(eventPrefix + "資料內容: " + this.readData(PARENT_PATH, true));
break;
case NodeChildrenChanged:
System.out.println(eventPrefix + "子節點變更");
Thread.sleep(3000);
System.out.println(eventPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
break;
case NodeDeleted:
System.out.println(eventPrefix + "節點 " + eventPath + " 被刪除");
break;
default:
break;
}
} else if (Watcher.Event.KeeperState.Disconnected == eventState) {
System.out.println(eventPrefix + "與ZK伺服器斷開連線");
} else if (Watcher.Event.KeeperState.AuthFailed == eventState) {
System.out.println(eventPrefix + "許可權檢查失敗");
} else if (Watcher.Event.KeeperState.Expired == eventState) {
System.out.println(eventPrefix + "會話失效");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
System.out.println("-------------------------------------------------------------");
}
public static void main(String[] args) throws InterruptedException {
TestWatcher zkWatch = new TestWatcher();
// 建立連線
zkWatch.createConnection(SERVER, SESSION_TIMEOUT);
Thread.sleep(1000);
// 刪除所有節點
zkWatch.deleteAllTestPath();
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
/**
* 讀取資料,在操作節點資料之前先呼叫zookeeper的getData()方法是為了可以watch到對節點的操作。watch是一次性的
* 也就是說,如果第二次又重新呼叫了setData()方法,在此之前需要重新呼叫一次
*/
System.out.println("------------------------讀取PARENT------------------------");
zkWatch.readData(PARENT_PATH, true);
// 更新資料
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
/**
* 讀取子節點,設定對子節點變化的watch,如果不寫該方法,則在建立子節點是隻會輸出NodeCreated,而不會輸出NodeChildrenChanged,
* 也就是說建立子節點時沒有watch,
* 如果是遞迴的建立子節點,如path="/p/c1/c2"的話,getChildren(PARENT_PATH, ture)只會在建立c1時watch,輸出c1的NodeChildrenChanged,
* 而不會輸出建立c2時的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,則需要再呼叫一次getChildren(String path, true)方法,
* 其中path="/p/c1"
*/
System.out.println("------------------------讀取CHILDREN------------------------");
zkWatch.getChildren(PARENT_PATH, true);
Thread.sleep(1000);
// 建立子節點
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
zkWatch.readData(CHILDREN_PATH, true);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(20000);
// 清理節點
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}
二、ZkClient
1. 基本操作
/**
* @Author SunnyBear
* @Description ZkClient基本操作
*/
public class TestZkClientApi {
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final int SESSION_TIMEOUT = 30000;
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(SERVER, SESSION_TIMEOUT);
// 建立臨時節點,值為null
zkClient.createEphemeral("/zkTemp");
// 建立持久節點,,值為null,如果父節點不存在則建立
zkClient.createPersistent("/zkPersistent/zk1", true);
// 建立持久節點,有值
zkClient.createPersistent("/zkPersistent/zk2", "zk2內容");
zkClient.createPersistent("/zkPersistent/zk3", "zk3內容");
// 查詢節點下面的所有節點
List<String> childrenList = zkClient.getChildren("/zkPersistent");
for (String p : childrenList) {
String childrenPath = "/zkPersistent/" + p;
// 查詢節點資料
String data = zkClient.readData(childrenPath);
System.out.println(childrenPath + ":" + data);
}
// 修改節點資料
zkClient.writeData("/zkPersistent/zk1", "給zk1更新了內容");
System.out.println(zkClient.readData("/zkPersistent/zk1"));
// 刪除節點
zkClient.delete("/zkTemp");
// 遞迴刪除,即包含子目錄的刪除
zkClient.deleteRecursive("/zkPersistent");
// 關閉連線
zkClient.close();
}
}
2. 監聽機制
/**
* @Author SunnyBear
* @Description ZkClient監聽的測試
*/
public class ZkClientWatcher {
private static final String SERVER = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";
private static final int SESSION_TIMEOUT = 30000;
public static void main(String[] args) throws InterruptedException {
// test1();
test2();
}
/**
* 訂閱子節點的變化
*/
public static void test1() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);
// 給父節點新增監聽子節點變化
zkClient.subscribeChildChanges("/zkPersistent", new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChildList) throws Exception {
System.out.println("parentPath:" + parentPath);
System.out.println("currentChildList" + currentChildList);
}
});
Thread.sleep(3000);
zkClient.createPersistent("/zkPersistent");
Thread.sleep(1000);
zkClient.createPersistent("/zkPersistent/"+ "zk1", "zk1內容");
Thread.sleep(1000);
zkClient.createPersistent("/zkPersistent/" + "zk2", "zk2內容");
Thread.sleep(1000);
zkClient.delete("/super/c2");
Thread.sleep(1000);
zkClient.deleteRecursive("/super");
Thread.sleep(10000);
zkClient.close();
}
/**
* 訂閱內容變化
*/
public static void test2() throws InterruptedException {
ZkClient zkClient = new ZkClient(new ZkConnection(SERVER), SESSION_TIMEOUT);
zkClient.createPersistent("/zkPersistent", "zkPersistentData");
// 對父節點新增監聽子節點變化。
zkClient.subscribeDataChanges("/zkPersistent", new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
System.out.println("刪除的節點為:" + path);
}
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("變更的節點為:" + path + ", 變更內容為:" + data);
}
});
Thread.sleep(3000);
zkClient.writeData("/zkPersistent", "zkPersistentDataUpdate", -1);
Thread.sleep(1000);
zkClient.delete("/zkPersistent");
Thread.sleep(10000);
}
}
都讀到這裡了,來個 點贊、評論、關注、收藏 吧!
文章作者:IT王小二
首發地址:https://www.itwxe.com/posts/5a33d634/
版權宣告:文章內容遵循 署名-非商業性使用-禁止演繹 4.0 國際 進行許可,轉載請在文章頁面明顯位置給出作者與原文連結。