2018-07-18:03-java操作zookeeper
阿新 • • 發佈:2019-01-02
- 客戶端操作zookeeper學習完後,我們要使用java控制zookeeper。我們先用apache提供的zookeeper功能來操作zookeeper伺服器。
- 我們建立的每個zookeeper都是一個客戶端連結
先看程式碼,然後解釋
1.建立一個maven專案,匯入Apache的zookeeper依賴
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.37</version> </dependency> </dependencies>
2.編寫客戶端連線
package com.zookeeper.test; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class ZookeeperConnection { private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; private static CountDownLatch countDownLatch = new CountDownLatch(1); //連線過程:not connection connecting connected close public static void main(String[] args) throws Exception { // testConnecting(); testConnected(); } public static void testConnecting() throws Exception{ ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { } }); System.out.println(zk.getState()); } public static void testConnected() throws Exception{ ZooKeeper zk = new ZooKeeper(ZK_CONNECTION, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ countDownLatch.countDown(); System.out.println(watchedEvent.getState()); } } }); countDownLatch.await(); System.out.println(zk.getState()); } }
zookeeper連線過程
- testConnected方法輸出的是connected
在使用countDounLatch的時候我們等待連線完成才輸出 - testConnecting方法輸出的是connecting
沒有使用執行緒等待,直接輸出。出現這種情況是因為連線是非同步的,主執行緒執行完畢,連線還在進行中。
###3.節點的增刪改查
public static void main(String[] args) throws Exception { // testConnecting(); ZooKeeper zk = testConnected(); //建立節點:節點路徑,節點攜帶的值,節點的許可權,建立持久化節點 zk.create("/node","abc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //獲得節點內容: Stat stat = new Stat(); byte[] data = zk.getData("/node",false,stat); System.out.println(new String(data)); //修改節點 zk.setData("/node","cba".getBytes(),-1); Stat stat1 = new Stat(); byte[] data1 = zk.getData("/node",false,stat1); System.out.println(new String(data1)); //刪除節點 zk.delete("/node",-1); }
4.zookeeper的watch功能(重點)
watch類似訂閱釋出模式。監聽某個節點,當節點發生變化監聽節點就會收到通知。直接上程式碼
- watch監聽類
public class MyWatch implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
try {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
if(Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()){
countDownLatch.countDown();
System.out.println("連線中..."+watchedEvent.getState());
}
//節點資料發生變化
else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
String path = watchedEvent.getPath();
//能夠註冊事件的方法getData
System.out.println("節點資料變化事件--路徑:"+watchedEvent.getPath());
}
//建立節點
else if(watchedEvent.getType() == Event.EventType.NodeCreated){
System.out.println("建立節點:路徑:"+watchedEvent.getPath());
}
//建立子節點
else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
String path = watchedEvent.getPath();
//能夠註冊事件的方法getData
System.out.println("子節點資料變化事件--路徑:"+watchedEvent.getPath());
}
//節點刪除
else if(watchedEvent.getType() == Event.EventType.NodeDeleted){
System.out.println("刪除節點");
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
}
}
- 實現一個watch只需要實現Watcher類就可以了,然後實現裡面的process方法
- WatchedEvent是事件物件,可以根據不同事件型別進行處理
- 測試watch類
public class ZkClient {
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
MyWatch myWatch = new MyWatch();
ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
Stat stat = zk1.exists("/watchNode",true);
if(stat == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
stat = zk1.exists("/watchNode",true);
if(stat == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}else{
System.out.println("客戶端2註冊了watchNode節點的getDate watch");
zk1.getData("/watchNode",true,stat);
System.out.println("客戶端1修改watchNode節點資料");
zk1.setData("/watchNode","1234".getBytes(),-1);
}
}
}
- 執行結果:
上面我們寫了最簡單的事件監聽。單個客戶端監聽/watchNode這個節點。zookeeper的監聽是單次通知,第二次操作/watchNode時需要二次監聽。zookeeper提供監聽的方法有三個:exists;getData;getChildren。
- 多客戶端監聽同一節點
public class ZkClient2 {
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception {
MyWatch myWatch = new MyWatch();
ZooKeeper zk1 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
ZooKeeper zk2 = new ZooKeeper(ZK_CONNECTION,5000,myWatch);
//zk1監聽了/watchNode節點事件
Stat stat1 = zk1.exists("/watchNode",true);
//zk2監聽了/watchNode節點事件
Stat stat2 = zk2.exists("/watchNode",true);
if(stat1 == null){
zk1.create("/watchNode","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
//當zk1建立節點後,zk2也收到了通知
}
}
- 執行結果: