Curator 操作 zookeeper 全面講解
阿新 • • 發佈:2018-12-29
zookeeper 的安裝與叢集的搭建 請參考我的另一片文章 https://blog.csdn.net/weixin_40461281/article/details/85336396
首先 建立一個maven專案 (不細講了,不會的自行百度)
匯入curator jar包
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
new 一個類 名為 CuratorCreateSessionDemo
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * Curator 建立session連線 demo */ public class CuratorCreateSessionDemo { // 叢集連線地址 private final static String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181"; public static void main(String[] args){ // 建立會話的兩種方式 normal CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient( CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3)); curatorFramework.start(); // fluent 風格 CuratorFramework build = CuratorFrameworkFactory.builder() .connectString(CONNECTSTRING) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) // 異常重試策略 .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 名稱空間(以後建立節點都在這個節點之下) .namespace("/curator") .build(); // 開啟連線 build.start(); System.out.println("success"); } }
為了便於之後的使用 我將連線編寫為 工具類 CuratorClientUtils
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * Curator客戶端工具類 */ public class CuratorClientUtils { // 叢集連線地址 private final static String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181"; private static CuratorFramework curatorFramework; public static CuratorFramework getInstance(){ curatorFramework = CuratorFrameworkFactory.newClient( CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3)); curatorFramework.start(); return curatorFramework; } }
API 操作
建立節點 curator 可以直接建立父節點與子節點 (底層為遞迴建立) 返回值為建立節點的名稱
// 建立節點
String s = curatorFramework.create()
// 同時建立父節點與子節點
.creatingParentContainersIfNeeded()
// PERSISTENT 永久節點 EPHEMERAL 臨時節點
.withMode(CreateMode.PERSISTENT)
.forPath("/curator/curator1/curator11", "123".getBytes());
System.out.println(s);
刪除節點 將上一步建立的 /curator 節點刪除
curator 可以直接指定 父節點進行刪除 會將該父節點下的子節點全部刪除 (底層為遞迴操作)
// 刪除節點
curatorFramework.delete()
// 同時刪除父節點與子節點
.deletingChildrenIfNeeded()
.forPath("/curator");
System.out.println("刪除成功...........");
查詢/獲取節點資料
// 查詢/獲取資料
Stat stat = new Stat();
byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
System.out.println(new String(bytes1) + " ---> stat: " + stat);
修改節點資料
// 修改資料
Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
System.out.println(stat1);
// 查詢/獲取資料
Stat stat2 = new Stat();
byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
System.out.println(new String(bytes2) + " ---> stat: " + stat2);
非同步操作
// 非同步操作
// 建立一個執行緒池
ExecutorService service = Executors.newFixedThreadPool(1);
// 建立同步計數器 數值為1
CountDownLatch countDownLatch = new CountDownLatch(1);
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
// 非同步執行
.inBackground((curatorFramework1, curatorEvent) -> {
// 輸出處理執行緒名稱 結果號 執行型別
System.out.println(Thread.currentThread().getName() + " -> resultCode" +
curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
// 計數器遞減
countDownLatch.countDown();
}, service).forPath("/demo","789".getBytes());
// 最後一個引數為 一個執行緒池(可不填) 表示將任務交給執行緒池來處理
// 阻塞執行緒 等待計數器歸零
countDownLatch.await();
// 關閉執行緒池
service.shutdown();
事務操作
操作一個錯誤節點
// 事務操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一個不存在的節點 會報錯並回退
.and().setData().forPath("/xxxx", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
報錯節點不存在
操作一個正確節點
// 事務操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一個正確節點
.and().setData().forPath("/curator", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
整體程式碼
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Curator API 操作 demo
*/
public class CuratorOperatorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
System.out.println("連線成功...........");
// 建立節點
String s = curatorFramework.create()
// 同時建立父節點與子節點
.creatingParentContainersIfNeeded()
// PERSISTENT 永久節點 EPHEMERAL 臨時節點
.withMode(CreateMode.PERSISTENT)
.forPath("/curator/curator1/curator11", "123".getBytes());
System.out.println("建立節點: "+s + "內容為 123");
// 刪除節點
curatorFramework.delete()
// 同時刪除父節點與子節點
.deletingChildrenIfNeeded()
.forPath("/curator");
System.out.println("刪除成功...........");
// 查詢/獲取資料
Stat stat = new Stat();
byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
System.out.println(new String(bytes1) + " ---> stat: " + stat);
// 修改資料
Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
System.out.println("修改成功: 內容已修改為 456 stat: "+stat1);
// 查詢/獲取資料
Stat stat2 = new Stat();
byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
System.out.println(new String(bytes2) + " ---> stat: " + stat2);
// 非同步操作
// 建立一個執行緒池
ExecutorService service = Executors.newFixedThreadPool(1);
// 建立同步計數器 數值為1
CountDownLatch countDownLatch = new CountDownLatch(1);
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
// 非同步執行
.inBackground((curatorFramework1, curatorEvent) -> {
// 輸出處理執行緒名稱 結果號 執行型別
System.out.println(Thread.currentThread().getName() + " -> resultCode" +
curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
// 計數器遞減
countDownLatch.countDown();
}, service).forPath("/demo","789".getBytes());
// 最後一個引數為 一個執行緒池(可不填) 表示將任務交給執行緒池來處理
// 阻塞執行緒 等待計數器歸零
countDownLatch.await();
// 關閉執行緒池
service.shutdown();
// 事務操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一個正確節點
.and().setData().forPath("/curator", "111".getBytes())
// 操作一個不存在的節點 會報錯並回退
// .and().setData().forPath("/xxxx", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
}
}
事件監聽
直接送上程式碼 註釋很詳細
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.TimeUnit;
/**
* Curator 事件監聽 dmeo
*/
public class CuratorEventDemo {
/**
* 三種watcher來做節點的監聽
* pathcache 監聽一個路徑下子節點的 建立 刪除 更新
* NodeCache 監聽一個節點的 建立 更新 刪除
* TreeCache pathcache 加 nodecache 的合體 (監聽路徑下的 建立 更新 刪除事件)
* 快取路徑下的所有子節點的資料
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
// 節點變化 NodeCache
// NodeCache cache = new NodeCache(curatorFramework,"/curator",false);
//
// // true 初始化設定
// cache.start(true);
//
// cache.getListenable().addListener(() ->
// System.out.println("節點資料發生變化,變化後的結果: " + new String(cache.getCurrentData().getData())));
//
// curatorFramework.setData().forPath("/curator","菲菲".getBytes());
// PathChildrenCache
PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/event", true);
/**
* Normal 初始化為空
* BUILD_INITIAL_CACHE 方法 return 之前 呼叫一個 rebuild 操作
* POST_INITIALIZED_EVENT cache 初始化後發出一個事件
*/
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
// 事件還有很多,列舉三個
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("增加子節點");
break;
case CHILD_REMOVED:
System.out.println("刪除子節點");
break;
case CHILD_UPDATED:
System.out.println("更新子節點");
break;
default:
break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.setData().forPath("/event/event1", "222".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.delete().forPath("/event/event1");
// curatorFramework.delete().deletingChildrenIfNeeded().forPath("/event");
System.in.read();
}
}