CuratorFramework節點監聽二 TreeCache
阿新 • • 發佈:2019-02-04
節點監聽
2.TreeCache
特點:
(1)永久監聽指定節點下的節點的變化
(2)可以監聽到指定節點下所有節點的變化,比如說指定節點”/example”, 在下面新增”node1”可以監聽到,但是新增”node1/n1”也能被監聽到
(3)可以監聽到的事件:節點建立、節點資料的變化、節點刪除等
使用方式:
(1)建立curatorframework的client
(2)新增TreeCache
(3)啟動client 和 TreeCache
(4)註冊監聽器
例子
package cache;
import com.google.common.collect.Lists;
import discovery.ExampleServer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
/**
* Created by yanan.sun on 16-10-19.
*/
@Component
public class TreeCacheExample {
private static final String PATH = "/mq_monitor";
private static final TestingServer server = buildTestServer();
private static CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.27.55:2181", new ExponentialBackoffRetry(1000, 3));
private static TreeCache treeCache = new TreeCache(client, PATH);
private static TestingServer buildTestServer() {
try {
return new TestingServer();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
start();
try {
processCommands(client, treeCache);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void start() {
try {
client.start();
treeCache.start();
}catch (Exception e){
System.out.println("error occurs");
}
}
private static void addListener(TreeCache cache) {
// a PathChildrenCacheListener is optional. Here, it's used just to log changes
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("Node add " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
case NODE_REMOVED:
System.out.println("Node removed " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
case NODE_UPDATED:
System.out.println("Node updated " + ZKPaths.getNodeFromPath(event.getData().getPath()));
break;
}
}
};
cache.getListenable().addListener(treeCacheListener);
}
public void destroy() {
CloseableUtils.closeQuietly(treeCache);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
private static void processCommands(CuratorFramework client, TreeCache cache) throws Exception {
// More scaffolding that does a simple command line processor
printHelp();
List<ExampleServer> servers = Lists.newArrayList();
try {
addListener(cache);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
boolean done = false;
while (!done) {
System.out.print("> ");
String line = in.readLine();
if (line == null) {
break;
}
String command = line.trim();
String[] parts = command.split("\\s");
if (parts.length == 0) {
continue;
}
String operation = parts[0];
String args[] = Arrays.copyOfRange(parts, 1, parts.length);
if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
printHelp();
} else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
done = true;
} else if (operation.equals("set")) {
setValue(client, command, args);
} else if (operation.equals("remove")) {
remove(client, command, args);
} else if (operation.equals("list")) {
//list(cache);
}
Thread.sleep(1000); // just to allow the console output to catch up
}
} finally {
for (ExampleServer server : servers) {
CloseableUtils.closeQuietly(server);
}
}
}
/* private static void list(TreeCache cache, String path) {
if (cache.getCurrentData(path) == null) {
System.out.println("* empty *");
} else {
for (ChildData data : cache.getCurrentData(path)) {
System.out.println(data.getPath() + " = " + new String(data.getData()));
}
}
}*/
private static void remove(CuratorFramework client, String command, String[] args) throws Exception {
if (args.length != 1) {
System.err.println("syntax error (expected remove <path>): " + command);
return;
}
String name = args[0];
/*if (name.contains("/")) {
System.err.println("Invalid node name" + name);
return;
}
String path = ZKPaths.makePath(PATH, name);*/
String path = PATH + "/" + name;
try {
client.delete().forPath(path);
} catch (KeeperException.NoNodeException e) {
// ignore
}
}
private static void setValue(CuratorFramework client, String command, String[] args) throws Exception {
if (args.length != 2) {
System.err.println("syntax error (expected set <path> <value>): " + command);
return;
}
String name = args[0];
/*if (name.contains("/")) {
System.err.println("Invalid node name" + name);
return;
}
String path = ZKPaths.makePath(PATH, name);*/
String path = PATH + "/" + name;
byte[] bytes = args[1].getBytes();
try {
if (client.checkExists().forPath(path) != null) {
System.out.println("path " + path + "exist");
} else {
client.create().creatingParentsIfNeeded().forPath(path, bytes);
}
// client.setData().forPath(path, bytes);
} catch (KeeperException.NoNodeException e) {
// client.create().creatingParentsIfNeeded().forPath(path, bytes);
}
}
private static void printHelp() {
System.out.println("An example of using PathChildrenCache. This example is driven by entering commands at the prompt:\n");
System.out.println("set <name> <value>: Adds or updates a node with the given name");
System.out.println("remove <name>: Deletes the node with the given name");
System.out.println("list: List the nodes/values in the cache");
System.out.println("quit: Quit the example");
System.out.println();
}
}
依賴
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
擴充套件
如果與spring結合,希望容器啟動的時候同時對節點進行監聽,則可以去掉類裡面的main方法,並在spring中做如下配置:
<bean id="treeCacheExample" class="cache.TreeCacheExample" init-method="start" destroy-method="destroy"></bean>