Apache Curator客戶端
阿新 • • 發佈:2018-11-08
一:Apache Curator簡介
1. Curator主要從以下幾個方面降低了zk使用的複雜性
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略,並且內部也提供了幾種標準的重試策略(比如指數補償)
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理.並在需要的情況, 重建zk例項,保證與zk叢集的可靠連線
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景),這些實現都遵循了zk的最佳實踐,並考慮了各種極端情況
2. Curator主要解決了三類問題
- 封裝ZooKeeper client與ZooKeeper server之間的連線處理
- 提供了一套Fluent風格的操作API
- 提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝
3. Curator聲稱的一些亮點
日誌工具
- 內部採用SLF4J 來輸出日誌
- 採用驅動器(driver)機制, 允許擴充套件和定製日誌和跟蹤處理
- 提供了一個TracerDriver介面, 通過實現addTrace()和addCount()介面來整合使用者自己的跟蹤框架
- 和Curator相比, 另一個ZooKeeper客戶端——zkClient的不足之處
- 文件幾乎沒有
- 異常處理弱爆了(簡單的丟擲RuntimeException)
- 重試處理太難用了
- 沒有提供各種使用場景的實現
- 對ZooKeeper自帶客戶端(ZooKeeper類)的”抱怨”
- 只是一個底層實現
- 要用需要自己寫大量的程式碼
- 很容易誤用
- 需要自己處理連線丟失, 重試等
4. Curator幾個組成部分
- Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法
- Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理
- Recipes: 實現了通用ZooKeeper的recipe, 該元件建立在Framework的基礎之上
- Utilities:各種ZooKeeper的工具類
- Errors: 異常處理, 連線, 恢復等
- Extensions: recipe擴充套件
二:基本示例
1. 基本操作
public class CuratorTest {
static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
static final int sessionTimeoutMs = 5000;
static int connectionTimeoutMs = 3000;
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// namespace: 名稱空間,即根節點,當多個應用使用同一個zk時能夠避免衝突, 操作時不需要顯式使用該根名稱空間(根節點)
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace("myapp")
.build();
// 連線
client.start();
String nodePath = "/node1";
Stat stat = client.checkExists().forPath(nodePath);
if (stat != null) {
// 只能刪除葉子節點, version=-1 表示不需要校驗版本,如果版本不對會丟擲異常(BadVersion)
// version版本相同才會更新
client.setData().withVersion(-1).forPath(nodePath, "node1 new value".getBytes());
byte[] bytes = client.getData().forPath(nodePath);
System.out.println("node1=" + new String(bytes));
// 讀取節點資料,獲取該節點的Stat
Stat stat1 = new Stat();
client.getData().storingStatIn(stat1).forPath(nodePath);
System.out.println("stat1=" + stat1);
client.delete().withVersion(-1).forPath(nodePath);
} else {
client.create().forPath(nodePath, "node1 default value".getBytes());
}
// creatingParentContainersIfNeeded遞迴建立節點
String result = client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node1/node11/node111", "node111 init value".getBytes());
System.out.println(result);
client.setData().forPath("/node1/node11/node111", "node111 new value".getBytes());
client.create().withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.inBackground()
.forPath(nodePath + "/node2", "node2 init value".getBytes());
client.getChildren().forPath(nodePath).forEach(node -> {
try {
String fullPath = nodePath + "/" + node;
System.out.println(node + " = " + new String(client.getData().forPath(fullPath)));
} catch (Exception e) {
e.printStackTrace();
}
});
// 刪除當前節點和子節點
// guaranteed是一個保障措施只要客戶端會話有效那麼Curator會在後臺持續進行刪除操作,直到刪除節點成功。
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/node1");
Thread.sleep(1000);
// 不存在則建立,存在則更新
client.create().orSetData().forPath("/node3", "node3 new value".getBytes());
client.close();
}
}
2. 事務操作
public class CuratorTest {
static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
static final int sessionTimeoutMs = 5000;
static int connectionTimeoutMs = 3000;
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// namespace: 根節點,操作時不需要顯式使用該根名稱空間(根節點)
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace("myapp")
.build();
// 連線
client.start();
// 定義事務操作
CuratorOp createCuratorOp = client.transactionOp().create().forPath("/node1", "node1 init value".getBytes());
CuratorOp setCuratorOp = client.transactionOp().setData().forPath("/node1", "node1 new value".getBytes());
CuratorOp delCuratorOp = client.transactionOp().delete().forPath("/node1");
// 返回值:為每個操作的結果
List<CuratorTransactionResult> results = client.transaction().forOperations(createCuratorOp, setCuratorOp, delCuratorOp);
for (CuratorTransactionResult result: results) {
System.out.println("執行結果:" + result.getForPath() + "\t" + result.getType() + "\t" + result.getError() + "\t" + result.getResultStat());
}
client.close();
}
}
3.監聽
方式一:全域性監聽
// 只有inBackground()才能被監聽到
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) {
System.out.println(event.getType() + "\t" + event.getPath());
}
};
client.getCuratorListenable().addListener(listener);
// inBackground 後臺執行,即非同步執行
client.create().inBackground().forPath("/node1", "node1 value".getBytes());
client.setData().inBackground().forPath("/node1", "node1 new value".getBytes());
client.delete().inBackground().forPath("/node1");
// WATCHED null
// CREATE /node1
// SET_DATA /node1
// DELETE /node1
// CLOSING null
方式二:監聽單個操作
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event.getType() + "\t" + event.getPath());
}
};
// inBackground 後臺執行,即非同步執行
client.create().inBackground(callback).forPath("/node1", "node1 value".getBytes());
client.setData().inBackground(callback).forPath("/node1", "node1 new value".getBytes());
client.delete().inBackground(callback).forPath("/node1");
// CREATE /node1
// SET_DATA /node1
// DELETE /node1
方式三:TreeCache
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
NodeCache 好像只能監控刪除節點和修改節點,沒有監控建立節點
final NodeCache cache = new NodeCache(client, "/node1");
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("路徑=" + data.getPath() + "\t data=" + new String(data.getData()));
} else {
System.out.println("節點被刪除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.create().forPath("/node1", "node1 value".getBytes());
client.setData().forPath("/node1", "node1 new value".getBytes());
client.delete().forPath("/node1");
cache.close();
// 路徑=/node1 data=node1 new value
// 節點被刪除!
PathChildrenCache 好像不能監控修改節點
PathChildrenCache cache = new PathChildrenCache(client, "/node1", true);
cache.start();
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件型別:" + event.getType());
if (null != event.getData()) {
System.out.println("節點資料:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
client.create().creatingParentsIfNeeded().forPath("/node1/node11", "node1 value".getBytes());
client.setData().forPath("/node1/node11", "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath("/node1");
cache.close();
// 事件型別:CONNECTION_RECONNECTED
// 事件型別:CHILD_ADDED
// 節點資料:/node1/node11 = node1 new value
// 事件型別:CHILD_REMOVED
// 節點資料:/node1/node11 = node1 new value
TreeCache = PathCache + NodeCache 建立、修改、刪除都能監控到
TreeCacheListener cacheListener = (client1, event) -> {
System.out.println("事件型別:" + event.getType() +
"\t路徑:" + (null != event.getData() ? event.getData().getPath() : null));
};
TreeCache cache = new TreeCache(client, "/node1");
cache.start();
cache.getListenable().addListener(cacheListener);
client.create().creatingParentsIfNeeded().forPath("/node1/node11", "node1 value".getBytes());
client.setData().forPath("/node1/node11", "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath("/node1");
Thread.sleep(1000);
cache.close();
// 事件型別:INITIALIZED 路徑:null
// 事件型別:NODE_ADDED 路徑:/node1
// 事件型別:NODE_ADDED 路徑:/node1/node11
// 事件型別:NODE_UPDATED 路徑:/node1/node11
// 事件型別:NODE_REMOVED 路徑:/node1/node11
// 事件型別:NODE_REMOVED 路徑:/node1
ConnectionStateListener
監聽連線狀態,當連線丟失時去重新連線
public class CuratorTest {
static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
static final int sessionTimeoutMs = 5000;
static int connectionTimeoutMs = 3000;
static CountDownLatch countDownLatch = new CountDownLatch(1);
static CuratorFramework client;
static ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED) {
System.out.println("連線成功");
countDownLatch.countDown();
} else if (newState == ConnectionState.LOST) {
System.out.println("連線丟失");
try {
System.out.println("重新初始化開始");
reInitClient();
System.out.println("重新初始化完畢");
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) throws Exception {
initClient();
client.close();
}
public static void initClient() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// namespace: 根節點,操作時不需要顯式使用該根名稱空間(根節點)
client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace("myapp")
.build();
// 連線
client.start();
client.getConnectionStateListenable().addListener(connectionStateListener);
countDownLatch.await();
}
public static void reInitClient() throws Exception {
// 先關閉client
if (client != null) {
client.close();
client = null;
}
// 然後再初始化客戶端
initClient();
}
}
CuratorWatcher 好像只能監聽到setData
CuratorWatcher pathWatcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
String path = event.getPath();
String value = new String(client.getData().forPath(path));
System.out.println(path + "=" +value);
}
System.out.println(event);
}
};
String path = "/node1";
client.create().forPath(path, "node1 value".getBytes());
String value = new String(client.getData().usingWatcher(pathWatcher).forPath(path));
System.out.println(path + "=" + value);
client.create().forPath(path + "/node11", "node1 value".getBytes());
client.setData().forPath(path, "node1 new value".getBytes());
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(5000);
// node1=node1 value
// node1=node1 new value
// WatchedEvent state:SyncConnected type:NodeDataChanged path:/node1