使用ZooKeeper提供的Java API操作ZooKeeper
我們先來創建一個普通的maven工程,然後在pom.xml文件中配置zookeeper依賴:
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency> </dependencies>
在resources目錄下創建一個zk-connect.properties屬性配置文件,我們在該文件中填寫連接zookeeper服務器的一些配置信息。如下:
# zk.zkServerIp=192.168.190.129:2181 單機模式
zk.zkServerIps=192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181
zk.timeout=5000
註:我這裏使用的集群模式,所以是多個IP。
zookeeper使用的是log4j作為日誌打印工具,所以我們還需要在resources目錄下創建log4j的
log4j.rootLogger=WARN,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.encoding=UTF-8 log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n
然後創建一個連接類demo,代碼如下:
package org.zero01.zk.demo; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.util.Properties; /** * @Description: zookeeper 連接demo演示 */ public class ZKConnect implements Watcher { private static final Logger logger = LoggerFactory.getLogger(ZKConnect.class); // private static String zkServerIp; 單機模式是一個ip // 集群模式則是多個ip private static String zkServerIps; // 連接超時時間 private static Integer timeout; // 加載配置信息 static { Properties properties = new Properties(); InputStream inputStream = Object.class.getResourceAsStream("/zk-connect.properties"); try { properties.load(inputStream); // zkServerIp = properties.getProperty("zk.zkServerIp"); zkServerIps = properties.getProperty("zk.zkServerIps"); timeout = Integer.parseInt(properties.getProperty("zk.timeout")); } catch (Exception e) { logger.error("配置文件讀取異常", e); } finally { try { if (inputStream != null) { inputStream.close(); } } catch (IOException e) { logger.error("關閉流失敗", e); } } } // Watch事件通知 public void process(WatchedEvent watchedEvent) { logger.warn("接收到watch通知:{}", watchedEvent); } public static void main(String[] args) throws IOException, InterruptedException { /** * 客戶端和zk服務端鏈接是一個異步的過程 * 當連接成功後後,客戶端會收的一個watch通知 * * 參數: * connectString:連接服務器的ip字符串, * 比如: "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181" * 可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表集群 * 也可以在ip後加路徑 * sessionTimeout:超時時間,心跳收不到了,那就超時 * watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設置為null * canBeReadOnly:可讀,當這個物理機節點斷開後,還是可以讀到數據的,只是不能寫, * 此時數據被讀取到的可能是舊數據,此處建議設置為false,不推薦使用 * sessionId:會話的id * sessionPasswd:會話密碼 當會話丟失後,可以依據 sessionId 和 sessionPasswd 重新獲取會話 */ // 實例化zookeeper客戶端 ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnect()); logger.warn("客戶端開始連接zookeeper服務器..."); logger.warn("連接狀態:{}", zooKeeper.getState()); // 避免發出連接請求就斷開,不然就無法正常連接也無法獲取watch事件的通知 Thread.sleep(2000); logger.warn("連接狀態:{}", zooKeeper.getState()); } }
運行該類後,控制臺輸出日誌信息如下:
2018-04-25 10:41:32,488 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:76)] - [WARN] 客戶端開始連接zookeeper服務器...
2018-04-25 10:41:32,505 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:77)] - [WARN] 連接狀態:CONNECTING
2018-04-25 10:41:32,515 [main-EventThread] [org.zero01.zk.demo.ZKConnect.process(ZKConnect.java:52)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 10:41:34,507 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:81)] - [WARN] 連接狀態:CONNECTED
這樣,我們就完成了一個與zookeeper服務端建立連接的過程。
zk會話重連機制
上一節我們簡單演示了如何去連接zk服務端,本節則介紹一下,如何通過sessionid和session密碼去恢復上一次的會話,也就是zk的會話重連機制。
新建一個類,用做於演示zk會話重連機制的demo:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @program: zookeeper-connection
* @description: zookeeper 恢復之前的會話連接demo演示
* @author: 01
* @create: 2018-04-25 12:59
**/
public class ZKConnectSessionWatcher implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
// Watch事件通知
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws IOException, InterruptedException {
// 實例化zookeeper客戶端
ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher());
logger.warn("客戶端開始連接zookeeper服務器...");
logger.warn("連接狀態:{}", zooKeeper.getState());
Thread.sleep(2000);
logger.warn("連接狀態:{}", zooKeeper.getState());
// 記錄本次會話的sessionId
long sessionId = zooKeeper.getSessionId();
// 轉換成16進制進行打印
logger.warn("sid:{}", "0x" + Long.toHexString(sessionId));
// 記錄本次會話的session密碼
byte[] sessionPassword = zooKeeper.getSessionPasswd();
Thread.sleep(200);
// 開始會話重連
logger.warn("開始會話重連...");
// 加上sessionId和password參數去實例化zookeeper客戶端
ZooKeeper zkSession = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
logger.warn("重新連接狀態zkSession:{}", zkSession.getState());
Thread.sleep(2000);
logger.warn("重新連接狀態zkSession:{}", zkSession.getState());
}
}
運行該類,控制臺輸出日誌結果如下:
2018-04-25 13:48:00,931 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:35)] - [WARN] 客戶端開始連接zookeeper服務器...
2018-04-25 13:48:00,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:36)] - [WARN] 連接狀態:CONNECTING
2018-04-25 13:48:00,951 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 連接狀態:CONNECTED
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:43)] - [WARN] sid:0x10000e81cfa0002
2018-04-25 13:48:03,136 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:50)] - [WARN] 開始會話重連...
2018-04-25 13:48:03,137 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:53)] - [WARN] 重新連接狀態zkSession:CONNECTING
2018-04-25 13:48:03,142 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:05,140 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:55)] - [WARN] 重新連接狀態zkSession:CONNECTED
同步/異步創建zk節點
以上我們介紹了如何去連接和重連zk服務端,既然知道如何連接zk服務端之後,我們來看一下如何,同步或異步去創建zk節點。
先演示同步創建zk節點的方式,創建一個demo類如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* @program: zookeeper-connection
* @description: 演示同步異步創建zk節點
* @author: 01
* @create: 2018-04-25 13:51
**/
public class ZkNodeOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZkNodeOperator.class);
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private ZooKeeper zooKeeper;
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public ZkNodeOperator() {
}
public ZkNodeOperator(String connectStr) {
try {
// 在使用該構造器的時候,實例化zk客戶端對象
zooKeeper = new ZooKeeper(connectStr, timeout, new ZkNodeOperator());
} catch (IOException e) {
e.printStackTrace();
try {
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
/**
* @Title: ZKOperatorDemo.java
* @Description: 創建zk節點
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {
String result = "";
try {
/**
* 同步或者異步創建節點,都不支持子節點的遞歸創建,異步有一個callback函數
* 參數:
* path:節點創建的路徑
* data:節點所存儲的數據的byte[]
* acl:控制權限策略
* Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:節點類型, 是一個枚舉
* PERSISTENT:持久節點
* PERSISTENT_SEQUENTIAL:持久順序節點
* EPHEMERAL:臨時節點
* EPHEMERAL_SEQUENTIAL:臨時順序節點
*/
// 同步創建zk節點,節點類型為臨時節點
result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
System.out.println("創建節點:\t" + result + "\t成功...");
Thread.sleep(2000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ZkNodeOperator zkServer = new ZkNodeOperator(zkServerIps);
// 創建zk節點
zkServer.createZKNode("/testNode", "testNode-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
}
運行該類,到服務器上查看是否已創建成功。如下,我這裏是創建成功的:
[root@zk001 ~]# zkCli.sh
[zk: localhost:2181(CONNECTED) 7] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, data, real-culster] # 因為是臨時節點,所以客戶端斷開之後就消失了
[zk: localhost:2181(CONNECTED) 9] quit
[root@zk001 ~]#
控制臺輸出的日誌信息如下:
2018-04-25 14:16:47,726 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
創建節點: /testNode 成功...
接下來我們演示一下異步創建zk節點的方式,因為異步創建有一個回調函數,所以我們得先創建一個類,實現StringCallback接口裏面的回調方法:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.StringCallback;
public class CreateCallBack implements StringCallback {
// 回調函數
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("創建節點:" + path);
System.out.println((String) ctx);
}
}
修改 ZkNodeOperator 類中的 createZKNode 方法代碼如下:
...
public class ZkNodeOperator implements Watcher {
...
/**
* @Title: ZKOperatorDemo.java
* @Description: 創建zk節點
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {
try {
...
// 異步步創建zk節點,節點類型為持久節點
String ctx = "{‘create‘:‘success‘}";
zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
運行該類,然後到服務器上查看是否已創建成功。如下,我這裏是創建成功的:
[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 10] get /testNode
testNode-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000014
mtime = Wed Apr 25 22:17:26 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
[zk: localhost:2181(CONNECTED) 11]
控制臺輸出的日誌信息如下:
2018-04-25 14:25:14,923 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
創建節點:/testNode
{‘create‘:‘success‘}
同步/異步修改zk節點數據
同樣的,我們也可以通過Zookeeper提供的Java API去修改zk節點的數據,也是有同步和異步兩種方式,先來演示同步的方式。創建一個新的類,代碼如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @program: zookeeper-connection
* @description: 修改zk節點數據演示
* @author: 01
* @create: 2018-04-25 16:25
**/
public class ZKNodeAlterOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKNodeAlterOperator.class);
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private ZooKeeper zooKeeper;
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public ZKNodeAlterOperator() {
}
public ZKNodeAlterOperator(String connectStr) {
try {
// 在使用該構造器的時候,實例化zk客戶端對象
zooKeeper = new ZooKeeper(connectStr, timeout, new ZKNodeAlterOperator());
} catch (IOException e) {
e.printStackTrace();
try {
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws KeeperException, InterruptedException {
ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
/**
* 修改zk節點數據(同步)
* 參數:
* path:節點路徑
* data:新數據
* version 數據版本
*/
Stat status = zkServer.getZooKeeper().setData("/testNode", "this is new data".getBytes(), 0);
// 通過Stat對象可以獲取znode所有的狀態屬性,這裏以version為例
System.out.println("修改成功,當前數據版本為:" + status.getVersion());
}
}
運行該類,到服務器上查看節點是否已成功修改數據。如下,我這裏是修改成功的:
[zk: localhost:2181(CONNECTED) 12] get /testNode
this is new data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000017
mtime = Thu Apr 26 00:21:54 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]
控制臺輸出的日誌信息如下:
2018-04-25 16:30:02,111 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:57)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改成功,當前數據版本為:1
接下來演示一下異步修改zk節點數據的方式,和異步創建節點是幾乎一樣的。也是需要新建一個類來實現回調接口的方法,只不過接口不一樣而已。如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;
public class AlterCallBack implements StatCallback {
// 回調函數
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("修改節點:" + path + "成功...");
// 通過Stat對象可以獲取znode所有的狀態屬性,這裏以version為例
System.out.println("當前數據版本為:" + stat.getVersion());
System.out.println((String) ctx);
}
}
然後修改 ZKNodeAlterOperator 類中的main方法代碼如下:
...
public class ZKNodeAlterOperator implements Watcher {
...
public static void main(String[] args) throws KeeperException, InterruptedException {
ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
/**
* 修改zk節點數據(異步)
* 參數:
* path:節點路徑
* data:新數據
* version: 數據版本
* sc:實現回調函數的對象
* ctx:給回調函數的上下文
*/
String ctx = "{‘alter‘:‘success‘}";
zkServer.getZooKeeper().setData("/testNode", "asynchronous-data".getBytes(), 0, new AlterCallBack(), ctx);
Thread.sleep(2000);
}
}
運行該類,到服務器上查看節點是否已成功修改數據。如下,我這裏是修改成功的:
[zk: localhost:2181(CONNECTED) 16] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x70000001a
mtime = Thu Apr 26 00:35:53 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 17]
控制臺輸出的日誌信息如下:
2018-04-25 16:44:03,472 [main-EventThread] [org.zero01.zk.demo.ZKNodeAlterOperator.process(ZKNodeAlterOperator.java:58)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改節點:/testNode成功...
當前數據版本為:2
{‘alter‘:‘success‘}
同步/異步刪除zk節點
同樣的,刪除節點也有同步和異步兩種方式,在刪除節點操作上,使用異步會更人性化一些,因為有回調通知,同步的方式,除了設置了watch事件,不然是沒有通知的。我們先來看一下同步方式的刪除節點,代碼如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKNodeDeleteOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKNodeDeleteOperator.class);
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
// 創建節點
zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
/**
* 刪除節點(同步)
* 參數:
* path:需要刪除的節點路徑
* version:數據版本
*/
zooKeeper.delete("/testDeleteNode", 0);
zooKeeper.close();
}
}
由於同步的刪除方法不會有返回值,所以我們無法在控制臺輸出內容。
然後再來看一下異步方式的刪除節點,首先需要新建一個類實現回調接口的方法:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
public class DeleteCallBack implements VoidCallback {
// 回調函數
public void processResult(int rc, String path, Object ctx) {
System.out.println("刪除節點:" + path + " 成功...");
System.out.println((String) ctx);
}
}
然後修改一下 ZKNodeDeleteOperator 類的main方法:
public class ZKNodeDeleteOperator implements Watcher {
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
// 創建節點
zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
/**
* 刪除節點(異步)
* 參數:
* path:需要刪除的節點路徑
* version:數據版本
* sc:實現回調函數的對象
* ctx:給回調函數的上下文
*/
String ctx = "{‘delete‘:‘success‘}";
zooKeeper.delete("/testDeleteNode", 0, new DeleteCallBack(), ctx);
Thread.sleep(2000);
zooKeeper.close();
}
}
運行該類,控制臺輸出結果如下:
刪除節點:/testDeleteNode 成功...
{‘delete‘:‘success‘}
獲取zk節點數據
以上小節介紹完了增刪改,現在就剩下查了。同樣的查詢也有同步和異步兩種方式,異步的方式在之前的增刪改例子中已經都介紹過了,在查詢裏使用異步也是和增刪改同樣的方式,所以就不再演示查詢的異步了。zk中有三種數據可以查詢:查詢zk節點數據、查詢zk子節點列表、查詢某個zk節點是否存在。本節先介紹如何查詢zk節點數據。
現在zookeeper服務器上,有一個/testNode節點。節點數據內容如下:
[zk: localhost:2181(CONNECTED) 3] get /testNode
asynchronous-data
...
[zk: localhost:2181(CONNECTED) 4]
然後我們來編寫一個 ZKGetNodeData 類,調用zookeeper的API去獲取zk節點數據。代碼示例:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @program: zookeeper-connection
* @description: 獲取zk節點數據demo
* @author: 01
* @create: 2018-04-26 18:05
**/
public class ZKGetNodeData implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKGetNodeData.class);
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
private static Stat stat = new Stat();
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
/**
* 參數:
* path:節點路徑
* watch:true或者false,註冊一個watch事件
* stat:狀態,我們可以通過這個對象獲取節點的狀態信息
*/
byte[] resByte = zooKeeper.getData("/testNode", true, stat);
String result = new String(resByte);
System.out.println("/testNode 節點的數據: " + result);
zooKeeper.close();
}
}
控制臺輸出結果如下:
/testNode 節點的值: asynchronous-data
通過實現 Watcher 接口的通知方法,再結合這個獲取節點數據的API,我們就可以在數據發生改變的時候獲取最新的數據。如下示例,在 ZKGetNodeData 類中,增加代碼如下:
...
public class ZKGetNodeData implements Watcher {
...
// 計數器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
try {
if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
byte[] resByte = zooKeeper.getData("/testNode", false, stat);
String result = new String(resByte);
System.out.println("/testNode 節點的數據發生了變化");
System.out.println("新的數據為: " + result);
System.out.println("新的數據版本號為:" + stat.getVersion());
// 通知完之後,計數器減一
countDownLatch.countDown();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
...
// 等待線程執行
countDownLatch.await();
}
}
這時候由於我們在main裏調用了await()方法,所以主線程會阻塞。然後我們到zookeeper服務器上,對該節點的數據進行操作,如下:
[zk: localhost:2181(CONNECTED) 11] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000011
mtime = Fri Apr 27 03:04:09 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] set /testNode new-data 6
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000013
mtime = Fri Apr 27 03:04:35 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]
當我們修改了數據之後,控制臺就會輸出如下內容,主線程就會解除阻塞結束執行:
/testNode 節點的數據: asynchronous-data
/testNode 節點的數據發生了變化
新的數據為: new-data
新的數據版本號為:7
獲取zk子節點列表
本節介紹一下如何獲取zk子節點列表,同樣的也是有同步和異步兩種方式,這裏介紹的是同步的。testNode節點下有三個節點,如下:
[zk: localhost:2181(CONNECTED) 20] ls /testNode
[ThreeNode, TwoNode, OneNode]
[zk: localhost:2181(CONNECTED) 21]
我們來寫一個demo獲取這個節點下的子節點列表。代碼如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
/**
* @program: zookeeper-connection
* @description: zookeeper 獲取子節點數據的demo演示
* @author: 01
* @create: 2018-04-26 21:13
**/
public class ZKGetChildrenList implements Watcher{
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetChildrenList());
/**
* 參數:
* path:父節點路徑
* watch:true或者false,註冊一個watch事件
*/
List<String> strChildList = zooKeeper.getChildren("/testNode", false);
for (String s : strChildList) {
System.out.println(s);
}
}
}
控制臺就會輸出內容如下:
ThreeNode
TwoNode
OneNode
判斷zk節點是否存在
最後介紹如何判斷一個zk節點是否存在,同樣的也是有同步和異步兩種方式,這裏介紹的是同步的。我們來寫一個demo判斷某個zk節點是否存在。代碼如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @program: zookeeper-connection
* @description: zookeeper 判斷節點是否存在demo
* @author: 01
* @create: 2018-04-26 22:06
**/
public class ZKNodeExist implements Watcher {
// 集群模式則是多個ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超時時間
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
public void process(WatchedEvent watchedEvent) {
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeExist());
/**
* 參數:
* path:節點路徑
* watch:true或者false,註冊一個watch事件
*/
Stat stat = zooKeeper.exists("/testNode", true);
if (stat != null) {
System.out.println("testNode 節點存在...");
System.out.println("該節點的數據版本為:" + stat.getVersion());
} else {
System.out.println("該節點不存在...");
}
}
}
運行該類,控制臺輸出如下:
testNode 節點存在...
該節點的數據版本為:7
將testNode換成一個不存在的節點,運行該類,控制臺輸出如下:
該節點不存在...
使用ZooKeeper提供的Java API操作ZooKeeper