ZooKeeper 監聽之持續監聽java程式碼實現
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class ZKTest {
private String connectString="hadoop101:2181,hadoop102:2181,hadoop103:2181";
private int sessionTimeout=5000;
private ZooKeeper zooKeeper;
// zkCli.sh
@Before
public void testConnection() throws Exception {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
// process 是watcher的回撥方法
@Override
public void process(WatchedEvent event) {
System.out.println(event);
}
});
System.out.println(zooKeeper.getState());
}
// 持續監聽: 在當前watcher的回撥方法中,再次設定觀察者! 遞迴呼叫!
public String GetData() throws Exception {
// 建立一個watcher
byte[] data = zooKeeper.getData("/atguigu", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath()+"資料發生了變化!====>"+event.getType());
try {
String newResult = GetData();
System.out.println("新的結果是:"+newResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}, null);
return new String(data);
}
@Test
public void testAlwaysSetWatcher() throws Exception {
String data = GetData();
System.out.println("查詢到的資料是:"+data);
// 必須保證程式在執行狀態,才可以獲取到watcher的回撥方法!
while(true) {
Thread.sleep(10000);
System.out.println("我還活著!");
}
}
@After
public void testClose() throws Exception {
if (zooKeeper !=null) {
zooKeeper.close();
}
}
}