1. 程式人生 > >Apache Curator簡單使用(一)

Apache Curator簡單使用(一)

轉載自: http://ifeve.com/zookeeper-curato-framework/
               http://blog.csdn.net/dc_726/article/details/46475633
               http://macrochen.iteye.com/blog/1366136

       Curator是Netflix開源的一套ZooKeeper客戶端框架,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。
Curator主要解決了三類問題: 
• 封裝ZooKeeper client與ZooKeeper server之間的連線處理; 
• 提供了一套Fluent風格的操作API; 
•提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝. 

Curator主要從以下幾個方面降低了zk使用的複雜性: 
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標準的重試策略(比如指數補償). 
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理.
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理. 並在需要的情況, 重建zk例項, 保證與zk叢集的可靠連線.
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.

Curator有四個包,curator-client,curator-test,curator-framework,curator-recipes,其中curator-client和curator-test是最基礎的,curator-framework依賴這兩個包,curator-recipes又依賴curator-framework。Curator組成部分有:
• Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法. 
• Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理 
• Recipes: 實現了通用ZooKeeper的recipe, 該元件基於Framework
• Utilities:各種ZooKeeper的工具類 
• Errors: 異常處理, 連線, 恢復等. 
• Extensions: 對curator-recipes的擴充套件實現,拆分為 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基於RESTful的Recipes WEB服務.

名稱空間(Namespace)
因為一個zk叢集會被多個應用共享, 為了避免各個應用的zk patch衝突, Curator Framework內部會給每一個Curator Framework例項分配一個namespace(可選). 這樣在create ZNode的時候都會自動加上這個namespace作為這個node path的root. 使用程式碼如下: 
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();  
…  
client.create().forPath("/test", data);// node was actually written to: "/MyApp/test",namespace也可以是多級” MyApp/app1”


方法說明: 
• create(): 發起一個create操作. 可以組合其他方法 (比如mode 或background) 最後以forPath()方法結尾 
• delete(): 發起一個刪除操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾 
• checkExists(): 發起一個檢查ZNode 是否存在的操作. 可以組合其他方法(watch 或background) 最後以forPath()方法結尾 
• getData(): 發起一個獲取ZNode資料的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾 
• setData(): 發起一個設定ZNode資料的操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾 
• getChildren(): 發起一個獲取ZNode子節點的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾 
• inTransaction(): 發起一個ZooKeeper事務. 可以組合create, setData, check, 和/或delete 為一個操作, 然後commit() 提交 

監聽器
Curator提供了三種Watcher(Cache)來監聽結點的變化:
• Path Cache:監視一個路徑下直接子結點的建立、刪除、以及結點資料的更新。產生的事件會傳遞給註冊的PathChildrenCacheListener。
• Node Cache:僅監視指定結點的建立、更新、刪除。產生的事件會傳遞給註冊的NodeCacheListener。
• Tree Cache:監視路徑下的子結點(所有子結節,不管有多少層子結點)的建立、更新、刪除事件。產生的事件會傳遞給註冊的TreeCacheListener。

簡單示例:
建立client
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
client.start();
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
//fluent style
String namespace = "cluster-worker";
CuratorFramework client = builder.connectString("127.0.0.1:2181")
		.sessionTimeoutMs(30000)
		.connectionTimeoutMs(30000)
		.canBeReadOnly(false)
		.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
		.namespace(namespace)
		.defaultData(null)
		.build();
client.start();
節點操作:
//節點建立
if (client.checkExists().forPath("/curator") == null)
    client.create().withMode(CreateMode.PERSISTENT).forPath("/curator");


//獲取子節點
System.out.println(client.getChildren().forPath("/curator"));


//設定並獲取資料
client.setData().forPath("/curator", "zero".getBytes());
System.out.println(client.getData().forPath("/curator"));


//刪除節點
client.delete().forPath("/curator");
監視器:
        PathChildrenCache watcher = new PathChildrenCache(client, "/curator", true);
        watcher.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if (data == null) {
                    System.out.println("No data in event[" + event + "]");
                } else {
                    System.out.println("Receive event: "
                            + "type=[" + event.getType() + "]"
                            + ", path=[" + data.getPath() + "]"
                            + ", data=[" + new String(data.getData()) + "]"
                            + ", stat=[" + data.getStat() + "]");
                }
            }
        });
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        final NodeCache nodeCache = new NodeCache(client, C_PATH);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("catch node data change");
                ChildData childData = nodeCache.getCurrentData();
                if(childData == null){
                    System.out.println("===delete, path=" + C_PATH + ", childData=" + childData);
                }else{
                    System.out.println("===update or add, path=" + C_PATH + ", childData=" + new String(childData.getData(), CHARSET));
                }
            }
        });
        nodeCache.start();
        final TreeCache treeCache = new TreeCache(client, C_PATH);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("catch tree change");
                if(event.getData() == null){
                    System.out.println("===init," + event.getType());
                    return;
                }


                if(event.getData().getData() == null){
                    System.out.println("===delete," + event.getType() + "," + event.getData().getPath());
                }else{
                    System.out.println("===update or add," + event.getType() + "," + event.getData().getPath() + "," + new String(event.getData().getData(), TreeListener.CHARSET));
                }
            }
        });
        treeCache.start();