Zookeeper客戶端的使用
Zookeeper建立連線:
需要注意的是,zk在main方法中執行的時候,因為zk的連線的非同步進行的,是一個守護執行緒,如果我們主執行緒停止,守護執行緒也會跟著停止,會導致我們根本感應不到zk連線建立main方法就結束了。這裡 使用countDownLatch來等待連線建立完成後再繼續往下走;
檢視到我們zk的狀態為
SyncConnected:代表連線建立好了
type:None;‘
通過這些事件感知到連線已經建立好了;
程式碼實現如下:
private static ZooKeeper zooKeeper=null;
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private final static String CONNECT_STR="192.168.88.3:2181";
private final static Integer SESSION_TIMEOUT=30*1000;
public static void main(String[] args) throws Exception{
zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType()== Event.EventType.None
&& event.getState() == Event.KeeperState.SyncConnected){
log.info("連線已建立");
countDownLatch. countDown();
}
}
});
countDownLatch.await();
}
zk建立節點及監聽
在上面程式碼的基礎上我們寫一個關於建立節點,並且監聽節點的改變;
MyConfig myConfig = new MyConfig();
myConfig.setKey("anykey");
myConfig.setName("anyName");
ObjectMapper objectMapper=new ObjectMapper();
byte[] bytes = objectMapper.writeValueAsBytes(myConfig);
String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Watcher watcher = new Watcher() {
@SneakyThrows
@Override
public void process(WatchedEvent event) {
if (event.getType()== Event.EventType.NodeDataChanged
&& event.getPath()!=null && event.getPath().equals("/myconfig")){
log.info(" PATH:{} 發生了資料變化" ,event.getPath());
byte[] data = zooKeeper.getData("/myconfig", this, null);
MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);
log.info("資料發生變化: {}",newConfig);
}
}
};
byte[] data = zooKeeper.getData("/myconfig", watcher, null);
MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);
log.info("原始資料: {}", originalMyConfig);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
重點程式碼理解:
byte[] bytes = objectMapper.writeValueAsBytes(myConfig);
資料必須以位元組的方式去傳遞
String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
阻塞的方式建立節點
String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
ZooDefs.Ids.OPEN_ACL_UNSAFE 相當於擁有所有許可權
CreateMode.PERSISTENT 建立持久化節點
event.getType()== Event.EventType.NodeDataChanged && event.getPath()!=null && event.getPath().equals("/myconfig"
當前監聽的事件為NodeDataChanged ,監聽的幾點路徑為/myconfig
//將節點新增到監聽中,寫了兩個地方,其中一個地方在通知完成後的地方,用於當通知發放後,還可以繼續監聽
byte[] data = zooKeeper.getData("/myconfig", this, null);
修改節點值
Stat stat = new Stat();
byte[] data = zooKeeper.getData(first_node, false, stat);
int version = stat.getVersion();
zooKeeper.setData(first_node, "third".getBytes(), 0);
//zooKeeper.setData(first_node, "third".getBytes(), version );
利用zooKeeper.getData(first_node, false, stat);來獲取當前節點的start的詳細資訊;(l樂觀鎖)
根據當前版本號進行修改,如果版本資訊不對的話,修改將不成功,防止程式碼併發的情況發生,在zk服務端中是不存在併發的情況的,其執行為阻塞佇列式執行的。
刪除節點值
跟修改節點一樣操作,但是多了一個,如果傳入的值為-1那麼無視版本號刪除;
getZooKeeper().delete("/config",-1);
非同步請求
getZooKeeper().getData("/xsn", false, (rc, path, ctx, data, stat) -> {
Thread thread = Thread.currentThread();
log.info(" Thread Name: {}, rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);},"xsn");
log.info(" over .");
執行結果
INFO [main:[email protected]] - over .
INFO [main-EventThread:[email protected]] - Thread Name: main-EventThread, rc:0, path:/xsn, ctx:xsn, data:[116, 101, 115, 116], stat:73,74,1609852651729,1609852662649,1,0,0,0,4,0,73
over在上面那個日誌之後打印出來的
Curator
對zk支援最好的第三方客戶端;封裝方法便於使用加上maven即可使用
建立連線
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
監聽
curatorFramework.getConnectionStateListenable().addListener(((client, newState) -> {
if (newState== ConnectionState.CONNECTED){
log.info(" 連線建立");
}
}));
建立節點
CuratorFramework curatorFramework = getCuratorFramework();
String path = curatorFramework.create().forPath("/curator‐node");
遞迴建立子節點
String pathWithParent = “/node-parent/sub-node-1”;
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
獲取資料
byte[] bytes = curatorFramework.getData().forPath("/curator‐node");
更新資料
curatorFramework.setData().forPath("/curator‐node",“changed!”.getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator‐node");
刪除節點
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/node‐parent");
防止異常的原因發生建立節點,在建立節點的時候先去判斷一下這個節點是否已經存在了,不存在再建立
String forPath = curatorFramework
.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
forPath("/curator-node", “some-data”.getBytes());
log.info(“curator create node :{} successfully.”, forPath);