1. 程式人生 > 其它 >Zookeeper客戶端的使用

Zookeeper客戶端的使用

技術標籤:筆記zookeeper

Zookeeper建立連線
需要注意的是,zk在main方法中執行的時候,因為zk的連線的非同步進行的,是一個守護執行緒,如果我們主執行緒停止,守護執行緒也會跟著停止,會導致我們根本感應不到zk連線建立main方法就結束了。這裡 使用countDownLatch來等待連線建立完成後再繼續往下走;
在這裡插入圖片描述
檢視到我們zk的狀態為
SyncConnected:代表連線建立好了
type:None;‘
通過這些事件感知到連線已經建立好了;
程式碼實現如下:

private static ZooKeeper zooKeeper=null;
    private static
CountDownLatch countDownLatch=new CountDownLatch(1); private final static String CONNECT_STR="192.168.88.3:2181"; private final static Integer SESSION_TIMEOUT=30*1000; public static void main(String[] args) throws Exception{ zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT,
new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType()== Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){ log.info("連線已建立"); countDownLatch.
countDown(); } } }); countDownLatch.await(); }

zk建立節點及監聽
在上面程式碼的基礎上我們寫一個關於建立節點,並且監聽節點的改變;

MyConfig myConfig = new MyConfig();
        myConfig.setKey("anykey");
        myConfig.setName("anyName");
        ObjectMapper objectMapper=new ObjectMapper();
        byte[] bytes = objectMapper.writeValueAsBytes(myConfig);
        String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Watcher watcher = new Watcher() {
            @SneakyThrows
            @Override
            public void process(WatchedEvent event) {
                if (event.getType()== Event.EventType.NodeDataChanged
                        && event.getPath()!=null && event.getPath().equals("/myconfig")){
                    log.info(" PATH:{}  發生了資料變化" ,event.getPath());
                    byte[] data = zooKeeper.getData("/myconfig", this, null);
                    MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);
                    log.info("資料發生變化: {}",newConfig);
                }
            }
        };
        byte[] data = zooKeeper.getData("/myconfig", watcher, null);
        MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);
        log.info("原始資料: {}", originalMyConfig);
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

重點程式碼理解
byte[] bytes = objectMapper.writeValueAsBytes(myConfig);
資料必須以位元組的方式去傳遞
String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
阻塞的方式建立節點

String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

ZooDefs.Ids.OPEN_ACL_UNSAFE 相當於擁有所有許可權
CreateMode.PERSISTENT 建立持久化節點

event.getType()== Event.EventType.NodeDataChanged && event.getPath()!=null && event.getPath().equals("/myconfig"

當前監聽的事件為NodeDataChanged ,監聽的幾點路徑為/myconfig

//將節點新增到監聽中,寫了兩個地方,其中一個地方在通知完成後的地方,用於當通知發放後,還可以繼續監聽
byte[] data = zooKeeper.getData("/myconfig", this, null);
修改節點值

Stat stat = new Stat();
byte[] data = zooKeeper.getData(first_node, false, stat);
 int version = stat.getVersion();
zooKeeper.setData(first_node, "third".getBytes(), 0);
//zooKeeper.setData(first_node, "third".getBytes(), version );

利用zooKeeper.getData(first_node, false, stat);來獲取當前節點的start的詳細資訊;(l樂觀鎖)
根據當前版本號進行修改,如果版本資訊不對的話,修改將不成功,防止程式碼併發的情況發生,在zk服務端中是不存在併發的情況的,其執行為阻塞佇列式執行的。
刪除節點值
跟修改節點一樣操作,但是多了一個,如果傳入的值為-1那麼無視版本號刪除;
getZooKeeper().delete("/config",-1);
非同步請求

getZooKeeper().getData("/xsn", false, (rc, path, ctx, data, stat) -> {
              Thread thread = Thread.currentThread();
              log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);},"xsn");
              log.info(" over .");

執行結果
INFO [main:[email protected]] - over .
INFO [main-EventThread:[email protected]] - Thread Name: main-EventThread, rc:0, path:/xsn, ctx:xsn, data:[116, 101, 115, 116], stat:73,74,1609852651729,1609852662649,1,0,0,0,4,0,73
over在上面那個日誌之後打印出來的
Curator
對zk支援最好的第三方客戶端;封裝方法便於使用加上maven即可使用
建立連線
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
監聽
curatorFramework.getConnectionStateListenable().addListener(((client, newState) -> {
if (newState== ConnectionState.CONNECTED){
log.info(" 連線建立");
}
}));
建立節點
CuratorFramework curatorFramework = getCuratorFramework();
String path = curatorFramework.create().forPath("/curator‐node");
遞迴建立子節點
String pathWithParent = “/node-parent/sub-node-1”;
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
獲取資料
byte[] bytes = curatorFramework.getData().forPath("/curator‐node");
更新資料
curatorFramework.setData().forPath("/curator‐node",“changed!”.getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator‐node");
刪除節點
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath("/node‐parent");
防止異常的原因發生建立節點,在建立節點的時候先去判斷一下這個節點是否已經存在了,不存在再建立
String forPath = curatorFramework
.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).
forPath("/curator-node", “some-data”.getBytes());
log.info(“curator create node :{} successfully.”, forPath);