zookeeper(四)——Java的API、Curator、watcher
阿新 • • 發佈:2019-01-26
一、JavaAPI提供ZooKeeper新增、查詢、修改、刪除節點操作
pom檔案:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
程式碼如下:
public class ConnectionDemo { public static void main(String[] args) { try { final CountDownLatch countDownLatch=new CountDownLatch(1); ZooKeeper zooKeeper= new ZooKeeper("192.168.1.200:2181," + "192.168.1.201:2181,192.168.1.202:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服務端的響應事件,連線成功 countDownLatch.countDown(); } } }); countDownLatch.await(); System.out.println(zooKeeper.getState());//CONNECTED //新增節點 zooKeeper.create("/zk-persis-cj","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); Thread.sleep(1000); Stat stat=new Stat(); //得到當前節點的值 byte[] bytes=zooKeeper.getData("/zk-persis-cj",null,stat); System.out.println(new String(bytes)); //修改節點值 zooKeeper.setData("/zk-persis-cj","1".getBytes(),stat.getVersion()); //得到當前節點的值 byte[] bytes1=zooKeeper.getData("/zk-persis-cj",null,stat); System.out.println(new String(bytes1)); zooKeeper.delete("/zk-persis-cj",stat.getVersion()); zooKeeper.close(); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
Watcher監控程式碼示例:
public class WatcherDemo { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { final CountDownLatch countDownLatch=new CountDownLatch(1); final ZooKeeper zooKeeper= new ZooKeeper("192.168.1.200:2181," + "192.168.1.201:2181,192.168.1.202:2181", 4000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("預設事件: "+event.getType()); if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服務端的響應事件,連線成功 countDownLatch.countDown(); } } }); countDownLatch.await(); zooKeeper.create("/zk-persis-cj","1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //exists getdata getchildren //通過exists繫結事件 Stat stat=zooKeeper.exists("/zk-persis-cj", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType()+"->"+event.getPath()); try { //再一次去繫結事件 zooKeeper.exists(event.getPath(),true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //通過修改的事務型別操作來觸發監聽事件 stat=zooKeeper.setData("/zk-persis-mic","2".getBytes(),stat.getVersion()); Thread.sleep(1000); zooKeeper.delete("/zk-persis-mic",stat.getVersion()); System.in.read(); } }
二、Curator的ZooKeeper新增、查詢、修改、刪除節點操作
pom依賴jar:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
示例程式碼:
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorFrameworkFactory.
builder().connectString("192.168.1.200:2181," +
"192.168.1.202:2181,192.168.1.202:2181").
sessionTimeoutMs(4000).retryPolicy(new
ExponentialBackoffRetry(1000,3)).
namespace("curator").build();
curatorFramework.start();
//結果: /curator/mic/node1
//原生api中,必須是逐層建立,也就是父節點必須存在,子節點才能建立
curatorFramework.create().creatingParentsIfNeeded().
withMode(CreateMode.PERSISTENT).
forPath("/mic/node1","1".getBytes());
//刪除
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/mic/node1");
Stat stat=new Stat();
curatorFramework.getData().storingStatIn(stat).forPath("/mic/node1");
curatorFramework.setData().
withVersion(stat.getVersion()).forPath("/mic/node1","xx".getBytes());
curatorFramework.close();
}
}
Watcher監控程式碼示例:
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorFrameworkFactory.
builder().connectString("192.168.1.200:2181," +
"192.168.1.201:2181,192.168.1.202:2181").
sessionTimeoutMs(4000).retryPolicy(new
ExponentialBackoffRetry(1000,3)).
namespace("curator").build();
curatorFramework.start();
//當前節點的建立和刪除事件監聽
addListenerWithNodeCache(curatorFramework,"/cj");
//子節點的增加、修改、刪除的事件監聽
addListenerWithPathChildCache(curatorFramework,"/cj");
//綜合節點監聽事件
addListenerWithTreeCache(curatorFramework,"/cj");
System.in.read();
}
public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
TreeCache treeCache=new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener=new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println(event.getType()+"->"+event.getData().getPath());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
}
/**
* PathChildCache 監聽一個節點下子節點的建立、刪除、更新
* NodeCache 監聽一個節點的更新和建立事件
* TreeCache 綜合PatchChildCache和NodeCache的特性
*/
public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
PathChildrenCache pathChildrenCache=new PathChildrenCache(curatorFramework,path,true);
PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("Receive Event:"+event.getType());
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
}
public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
NodeCacheListener nodeCacheListener=new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Receive Event:"+nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}