Apache Curator簡單使用(一)
阿新 • • 發佈:2019-02-15
轉載自: http://ifeve.com/zookeeper-curato-framework/
http://blog.csdn.net/dc_726/article/details/46475633
http://macrochen.iteye.com/blog/1366136
Curator是Netflix開源的一套ZooKeeper客戶端框架,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。
Curator主要解決了三類問題:
• 封裝ZooKeeper client與ZooKeeper server之間的連線處理;
• 提供了一套Fluent風格的操作API;
•提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝.
Curator主要從以下幾個方面降低了zk使用的複雜性:
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標準的重試策略(比如指數補償).
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理.
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理. 並在需要的情況, 重建zk例項, 保證與zk叢集的可靠連線.
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.
Curator有四個包,curator-client,curator-test,curator-framework,curator-recipes,其中curator-client和curator-test是最基礎的,curator-framework依賴這兩個包,curator-recipes又依賴curator-framework。Curator組成部分有:
• Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法.
• Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理
• Recipes: 實現了通用ZooKeeper的recipe, 該元件基於Framework
• Utilities:各種ZooKeeper的工具類
• Errors: 異常處理, 連線, 恢復等.
• Extensions: 對curator-recipes的擴充套件實現,拆分為 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基於RESTful的Recipes WEB服務.
名稱空間(Namespace)
因為一個zk叢集會被多個應用共享, 為了避免各個應用的zk patch衝突, Curator Framework內部會給每一個Curator Framework例項分配一個namespace(可選). 這樣在create ZNode的時候都會自動加上這個namespace作為這個node path的root. 使用程式碼如下:
方法說明:
• create(): 發起一個create操作. 可以組合其他方法 (比如mode 或background) 最後以forPath()方法結尾
• delete(): 發起一個刪除操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
• checkExists(): 發起一個檢查ZNode 是否存在的操作. 可以組合其他方法(watch 或background) 最後以forPath()方法結尾
• getData(): 發起一個獲取ZNode資料的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
• setData(): 發起一個設定ZNode資料的操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
• getChildren(): 發起一個獲取ZNode子節點的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
• inTransaction(): 發起一個ZooKeeper事務. 可以組合create, setData, check, 和/或delete 為一個操作, 然後commit() 提交
監聽器
Curator提供了三種Watcher(Cache)來監聽結點的變化:
• Path Cache:監視一個路徑下直接子結點的建立、刪除、以及結點資料的更新。產生的事件會傳遞給註冊的PathChildrenCacheListener。
• Node Cache:僅監視指定結點的建立、更新、刪除。產生的事件會傳遞給註冊的NodeCacheListener。
• Tree Cache:監視路徑下的子結點(所有子結節,不管有多少層子結點)的建立、更新、刪除事件。產生的事件會傳遞給註冊的TreeCacheListener。
簡單示例:
建立client
http://blog.csdn.net/dc_726/article/details/46475633
http://macrochen.iteye.com/blog/1366136
Curator是Netflix開源的一套ZooKeeper客戶端框架,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。
Curator主要解決了三類問題:
• 封裝ZooKeeper client與ZooKeeper server之間的連線處理;
• 提供了一套Fluent風格的操作API;
•提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝.
Curator主要從以下幾個方面降低了zk使用的複雜性:
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標準的重試策略(比如指數補償).
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理.
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理. 並在需要的情況, 重建zk例項, 保證與zk叢集的可靠連線.
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.
Curator有四個包,curator-client,curator-test,curator-framework,curator-recipes,其中curator-client和curator-test是最基礎的,curator-framework依賴這兩個包,curator-recipes又依賴curator-framework。Curator組成部分有:
• Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法.
• Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理
• Recipes: 實現了通用ZooKeeper的recipe, 該元件基於Framework
• Utilities:各種ZooKeeper的工具類
• Errors: 異常處理, 連線, 恢復等.
• Extensions: 對curator-recipes的擴充套件實現,拆分為 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基於RESTful的Recipes WEB服務.
名稱空間(Namespace)
因為一個zk叢集會被多個應用共享, 為了避免各個應用的zk patch衝突, Curator Framework內部會給每一個Curator Framework例項分配一個namespace(可選). 這樣在create ZNode的時候都會自動加上這個namespace作為這個node path的root. 使用程式碼如下:
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
…
client.create().forPath("/test", data);// node was actually written to: "/MyApp/test",namespace也可以是多級” MyApp/app1”
方法說明:
• create(): 發起一個create操作. 可以組合其他方法 (比如mode 或background) 最後以forPath()方法結尾
• delete(): 發起一個刪除操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
• checkExists(): 發起一個檢查ZNode 是否存在的操作. 可以組合其他方法(watch 或background) 最後以forPath()方法結尾
• getData(): 發起一個獲取ZNode資料的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
• setData(): 發起一個設定ZNode資料的操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
• getChildren(): 發起一個獲取ZNode子節點的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
• inTransaction(): 發起一個ZooKeeper事務. 可以組合create, setData, check, 和/或delete 為一個操作, 然後commit() 提交
監聽器
Curator提供了三種Watcher(Cache)來監聽結點的變化:
• Path Cache:監視一個路徑下直接子結點的建立、刪除、以及結點資料的更新。產生的事件會傳遞給註冊的PathChildrenCacheListener。
• Node Cache:僅監視指定結點的建立、更新、刪除。產生的事件會傳遞給註冊的NodeCacheListener。
• Tree Cache:監視路徑下的子結點(所有子結節,不管有多少層子結點)的建立、更新、刪除事件。產生的事件會傳遞給註冊的TreeCacheListener。
簡單示例:
建立client
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3)); client.start();
節點操作:CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); //fluent style String namespace = "cluster-worker"; CuratorFramework client = builder.connectString("127.0.0.1:2181") .sessionTimeoutMs(30000) .connectionTimeoutMs(30000) .canBeReadOnly(false) .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .namespace(namespace) .defaultData(null) .build(); client.start();
//節點建立
if (client.checkExists().forPath("/curator") == null)
client.create().withMode(CreateMode.PERSISTENT).forPath("/curator");
//獲取子節點
System.out.println(client.getChildren().forPath("/curator"));
//設定並獲取資料
client.setData().forPath("/curator", "zero".getBytes());
System.out.println(client.getData().forPath("/curator"));
//刪除節點
client.delete().forPath("/curator");
監視器: PathChildrenCache watcher = new PathChildrenCache(client, "/curator", true);
watcher.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
final NodeCache nodeCache = new NodeCache(client, C_PATH);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("catch node data change");
ChildData childData = nodeCache.getCurrentData();
if(childData == null){
System.out.println("===delete, path=" + C_PATH + ", childData=" + childData);
}else{
System.out.println("===update or add, path=" + C_PATH + ", childData=" + new String(childData.getData(), CHARSET));
}
}
});
nodeCache.start();
final TreeCache treeCache = new TreeCache(client, C_PATH);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("catch tree change");
if(event.getData() == null){
System.out.println("===init," + event.getType());
return;
}
if(event.getData().getData() == null){
System.out.println("===delete," + event.getType() + "," + event.getData().getPath());
}else{
System.out.println("===update or add," + event.getType() + "," + event.getData().getPath() + "," + new String(event.getData().getData(), TreeListener.CHARSET));
}
}
});
treeCache.start();