curator中監聽器的實現
1:curator中的三種監聽器實現
第一種是可以監控本節點中節點的建立,刪除,資料的修改。僅限於此節點本身
public class CuratorNodeCacheListener {
private static final String PATH =”/example/cache”;
private static final String zkStr=”master:2181,worker1:2181,worker2:2181”;
private static InterProcessMutex lock;
public static void main(String [] args) throws Exception{ CuratorFramework client = null; NodeCache cache = null; try { client = CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3)); client.start(); cache = new NodeCache(client,PATH); cache.start(); processCommand(cache); } finally { CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(client); } } private static void processCommand( NodeCache cache) { boolean isDone=false; try{ addListener(cache); while(!isDone){ Thread.sleep(Long.MAX_VALUE); } }catch (Exception e){ }finally { } } private static void addListener(final NodeCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log changes NodeCacheListener listener=new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println("catch node data change"); ChildData childData=cache.getCurrentData(); if(childData==null){ System.out.println("data is delete childData="+childData); }else{ System.out.println("data is update data="+childData.getData().toString()); } } }; cache.getListenable().addListener(listener); }
}
這種監聽器主要是監聽某一路徑對應的結點下直接子節點的建立,刪除,修改資料,僅限於直接節點,再往下的結點就不能監控了
public class CuratorPathChildListener {
private static final String PATH = “/example/cache”;
private static final String zkStr=”master:2181,worker1:2181,worker2:2181”;
public static void main(String[] args) throws Exception{
CuratorFramework client = null;
PathChildrenCache cache = null;
try
{
client = CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
client.start();
cache = new PathChildrenCache(client, PATH, true);
cache.start();
processCommand(cache);
}
finally
{
CloseableUtils.closeQuietly(cache);
CloseableUtils.closeQuietly(client);
}
}
private static void processCommand(PathChildrenCache cache) { boolean isDone=false; try{ addListener(cache); while(!isDone){ Thread.sleep(Long.MAX_VALUE); } }catch (Exception e){ }finally { } } private static void addListener(PathChildrenCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log changes PathChildrenCacheListener listener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch ( event.getType() ) { case CHILD_ADDED: { System.out.println("Node added: " + event.getData().getPath()); break; } case CHILD_UPDATED: { System.out.println("Node changed: " + event.getData().getPath()); break; } case CHILD_REMOVED: { System.out.println("Node removed: " + event.getData().getPath()); break; } } } }; cache.getListenable().addListener(listener); }
}