1. 程式人生 > >CuratorFramework節點監聽二 TreeCache

CuratorFramework節點監聽二 TreeCache

節點監聽

2.TreeCache

特點:

(1)永久監聽指定節點下的節點的變化
(2)可以監聽到指定節點下所有節點的變化,比如說指定節點”/example”, 在下面新增”node1”可以監聽到,但是新增”node1/n1”也能被監聽到
(3)可以監聽到的事件:節點建立、節點資料的變化、節點刪除等

使用方式:

(1)建立curatorframework的client
(2)新增TreeCache
(3)啟動client 和 TreeCache
(4)註冊監聽器

例子

package cache;

import com.google.common.collect.Lists;
import
discovery.ExampleServer; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import
org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.KeeperException; import org.springframework.stereotype.Component; import java.io.BufferedReader; import
java.io.InputStreamReader; import java.util.Arrays; import java.util.List; /** * Created by yanan.sun on 16-10-19. */ @Component public class TreeCacheExample { private static final String PATH = "/mq_monitor"; private static final TestingServer server = buildTestServer(); private static CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.27.55:2181", new ExponentialBackoffRetry(1000, 3)); private static TreeCache treeCache = new TreeCache(client, PATH); private static TestingServer buildTestServer() { try { return new TestingServer(); } catch (Exception e) { e.printStackTrace(); } return null; } public static void main(String[] args) { start(); try { processCommands(client, treeCache); } catch (Exception e) { e.printStackTrace(); } } public static void start() { try { client.start(); treeCache.start(); }catch (Exception e){ System.out.println("error occurs"); } } private static void addListener(TreeCache cache) { // a PathChildrenCacheListener is optional. Here, it's used just to log changes TreeCacheListener treeCacheListener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()) { case NODE_ADDED: System.out.println("Node add " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; case NODE_REMOVED: System.out.println("Node removed " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; case NODE_UPDATED: System.out.println("Node updated " + ZKPaths.getNodeFromPath(event.getData().getPath())); break; } } }; cache.getListenable().addListener(treeCacheListener); } public void destroy() { CloseableUtils.closeQuietly(treeCache); CloseableUtils.closeQuietly(client); CloseableUtils.closeQuietly(server); } private static void processCommands(CuratorFramework client, TreeCache cache) throws Exception { // More scaffolding that does a simple command line processor printHelp(); List<ExampleServer> servers = Lists.newArrayList(); try { addListener(cache); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); boolean done = false; while (!done) { System.out.print("> "); String line = in.readLine(); if (line == null) { break; } String command = line.trim(); String[] parts = command.split("\\s"); if (parts.length == 0) { continue; } String operation = parts[0]; String args[] = Arrays.copyOfRange(parts, 1, parts.length); if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) { printHelp(); } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) { done = true; } else if (operation.equals("set")) { setValue(client, command, args); } else if (operation.equals("remove")) { remove(client, command, args); } else if (operation.equals("list")) { //list(cache); } Thread.sleep(1000); // just to allow the console output to catch up } } finally { for (ExampleServer server : servers) { CloseableUtils.closeQuietly(server); } } } /* private static void list(TreeCache cache, String path) { if (cache.getCurrentData(path) == null) { System.out.println("* empty *"); } else { for (ChildData data : cache.getCurrentData(path)) { System.out.println(data.getPath() + " = " + new String(data.getData())); } } }*/ private static void remove(CuratorFramework client, String command, String[] args) throws Exception { if (args.length != 1) { System.err.println("syntax error (expected remove <path>): " + command); return; } String name = args[0]; /*if (name.contains("/")) { System.err.println("Invalid node name" + name); return; } String path = ZKPaths.makePath(PATH, name);*/ String path = PATH + "/" + name; try { client.delete().forPath(path); } catch (KeeperException.NoNodeException e) { // ignore } } private static void setValue(CuratorFramework client, String command, String[] args) throws Exception { if (args.length != 2) { System.err.println("syntax error (expected set <path> <value>): " + command); return; } String name = args[0]; /*if (name.contains("/")) { System.err.println("Invalid node name" + name); return; } String path = ZKPaths.makePath(PATH, name);*/ String path = PATH + "/" + name; byte[] bytes = args[1].getBytes(); try { if (client.checkExists().forPath(path) != null) { System.out.println("path " + path + "exist"); } else { client.create().creatingParentsIfNeeded().forPath(path, bytes); } // client.setData().forPath(path, bytes); } catch (KeeperException.NoNodeException e) { // client.create().creatingParentsIfNeeded().forPath(path, bytes); } } private static void printHelp() { System.out.println("An example of using PathChildrenCache. This example is driven by entering commands at the prompt:\n"); System.out.println("set <name> <value>: Adds or updates a node with the given name"); System.out.println("remove <name>: Deletes the node with the given name"); System.out.println("list: List the nodes/values in the cache"); System.out.println("quit: Quit the example"); System.out.println(); } }

依賴

<dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-x-discovery</artifactId>
            <version>2.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

擴充套件

如果與spring結合,希望容器啟動的時候同時對節點進行監聽,則可以去掉類裡面的main方法,並在spring中做如下配置:

<bean id="treeCacheExample" class="cache.TreeCacheExample" init-method="start" destroy-method="destroy"></bean>