第7章 Apache Curator客戶端的使用
阿新 • • 發佈:2018-12-05
Apache Curator客戶端的使用
- 7-1 curator簡介與客戶端之間的異同點
- 7-2 搭建maven工程,建立curator與zkserver的連線
- 7-3 zk名稱空間以及建立節點
- 7-9 zk-watcher例項 統一更新N臺節點的配置檔案
- 7-10 curator之acl許可權操作與認證授權
7-1 curator簡介與客戶端之間的異同點
7-2 搭建maven工程,建立curator與zkserver的連線
這個是需要引入的依賴:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency>
前面兩種是比較推薦的:
當構建好了RetryPolicy物件了之後,就可以用建造者模式來進行構建客戶端:
public class CuratorOperator {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";
/**
* 例項化zk客戶端
*/
public CuratorOperator() {
/**
* 同步建立zk示例,原生api是非同步的
*
* curator連結zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的時間
* maxRetries:最大重試次數
* maxSleepMs:最大重試時間
*/
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
/**
* curator連結zookeeper的策略:RetryNTimes
* n:重試的次數
* sleepMsBetweenRetries:每次重試間隔的時間
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
/**
* curator連結zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重試間隔的時間
*/
// RetryPolicy retryPolicy2 = new RetryOneTime(3000);
/**
* 永遠重試,不推薦使用
*/
// RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
/**
* curator連結zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重試時間
* sleepMsBetweenRetries:每次重試間隔
* 重試時間超過maxElapsedTimeMs後,就不再重試
*/
// RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
/**
*
* @Description: 關閉zk客戶端連線
*/
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
// 建立節點
String nodePath = "/super/imooc";
// byte[] data = "superme".getBytes();
// cto.client.create().creatingParentsIfNeeded()
// .withMode(CreateMode.PERSISTENT)
// .withACL(Ids.OPEN_ACL_UNSAFE)
// .forPath(nodePath, data);
// 更新節點資料
// byte[] newData = "batman".getBytes();
// cto.client.setData().withVersion(0).forPath(nodePath, newData);
// 刪除節點
// cto.client.delete()
// .guaranteed() // 如果刪除失敗,那麼在後端還是繼續會刪除,直到成功
// .deletingChildrenIfNeeded() // 如果有子節點,就刪除
// .withVersion(0)
// .forPath(nodePath);
// 讀取節點資料
// Stat stat = new Stat();
// byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
// System.out.println("節點" + nodePath + "的資料為: " + new String(data));
// System.out.println("該節點的版本號為: " + stat.getVersion());
// 查詢子節點
// List<String> childNodes = cto.client.getChildren()
// .forPath(nodePath);
// System.out.println("開始列印子節點:");
// for (String s : childNodes) {
// System.out.println(s);
// }
// 判斷節點是否存在,如果不存在則為空
// Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");
// System.out.println(statExist);
// watcher 事件 當使用usingWatcher的時候,監聽只會觸發一次,監聽完畢後就銷燬
// cto.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
// cto.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
// 為節點新增watcher
// NodeCache: 監聽資料節點的變更,會觸發事件
// final NodeCache nodeCache = new NodeCache(cto.client, nodePath);
// // buildInitial : 初始化的時候獲取node的值並且快取
// nodeCache.start(true);
// if (nodeCache.getCurrentData() != null) {
// System.out.println("節點初始化資料為:" + new String(nodeCache.getCurrentData().getData()));
// } else {
// System.out.println("節點初始化資料為空...");
// }
// nodeCache.getListenable().addListener(new NodeCacheListener() {
// public void nodeChanged() throws Exception {
// if (nodeCache.getCurrentData() == null) {
// System.out.println("空");
// return;
// }
// String data = new String(nodeCache.getCurrentData().getData());
// System.out.println("節點路徑:" + nodeCache.getCurrentData().getPath() + "資料:" + data);
// }
// });
// 為子節點新增watcher
// PathChildrenCache: 監聽資料節點的增刪改,會觸發事件
String childNodePathCache = nodePath;
// cacheData: 設定快取節點的資料狀態
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件
* NORMAL:非同步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("當前資料節點的子節點資料列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子節點初始化ok...");
}
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals(ADD_PATH)) {
System.out.println("新增子節點:" + event.getData().getPath());
System.out.println("子節點資料:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("新增不正確...");
}
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("刪除子節點:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子節點路徑:" + event.getData().getPath());
System.out.println("修改子節點資料:" + new String(event.getData().getData()));
}
}
});
Thread.sleep(100000);
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
public final static String ADD_PATH = "/super/imooc/d";
}
7-3 zk名稱空間以及建立節點
public static void main(String[] args) throws Exception {
// 例項化
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
// 建立節點
String nodePath = "/super/ldc";
byte[] data = "superme".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
}
7-9 zk-watcher例項 統一更新N臺節點的配置檔案
有3個客戶端類:
public class Client1 {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";
public Client1() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
// public final static String CONFIG_NODE = "/super/imooc/redis-config";
public final static String CONFIG_NODE_PATH = "/super/imooc";
public final static String SUB_PATH = "/redis-config";
public static CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Client1 cto = new Client1();
System.out.println("client1 啟動成功...");
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
// 新增監聽事件
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 監聽節點變化
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
String configNodePath = event.getData().getPath();
if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
System.out.println("監聽到配置發生變化,節點路徑為:" + configNodePath);
// 讀取節點資料
String jsonConfig = new String(event.getData().getData());
System.out.println("節點" + CONFIG_NODE_PATH + "的資料為: " + jsonConfig);
// 從json轉換配置
RedisConfig redisConfig = null;
if (StringUtils.isNotBlank(jsonConfig)) {
redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
}
// 配置不為空則進行相應操作
if (redisConfig != null) {
String type = redisConfig.getType();
String url = redisConfig.getUrl();
String remark = redisConfig.getRemark();
// 判斷事件
if (type.equals("add")) {
System.out.println("監聽到新增的配置,準備下載...");
// ... 連線ftp伺服器,根據url找到相應的配置
Thread.sleep(500);
System.out.println("開始下載新的配置檔案,下載路徑為<" + url + ">");
// ... 下載配置到你指定的目錄
Thread.sleep(1000);
System.out.println("下載成功,已經新增到專案中");
// ... 拷貝檔案到專案目錄
} else if (type.equals("update")) {
System.out.println("監聽到更新的配置,準備下載...");
// ... 連線ftp伺服器,根據url找到相應的配置
Thread.sleep(500);
System.out.println("開始下載配置檔案,下載路徑為<" + url + ">");
// ... 下載配置到你指定的目錄
Thread.sleep(1000);
System.out.println("下載成功...");
System.out.println("刪除專案中原配置檔案...");
Thread.sleep(100);
// ... 刪除原檔案
System.out.println("拷貝配置檔案到專案目錄...");
// ... 拷貝檔案到專案目錄
} else if (type.equals("delete")) {
System.out.println("監聽到需要刪除配置");
System.out.println("刪除專案中原配置檔案...");
}
// TODO 視情況統一重啟服務
}
}
}
}
});
countDown.await();
cto.closeZKClient();
}
}
7-10 curator之acl許可權操作與認證授權
public class CuratorAcl {
public CuratorFramework client = null;
public static final String zkServerPath = "192.168.1.110:2181";
public CuratorAcl() {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
// 例項化
CuratorAcl cto = new CuratorAcl();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
String nodePath = "/acl/father/child/sub";
List<ACL> acls = new ArrayList<ACL>();
Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
acls.add(new ACL(Perms.ALL, imooc1));
acls.add(new ACL(Perms.READ, imooc2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));
// 建立節點
// byte[] data = "spiderman".getBytes();
// cto.client.create().creatingParentsIfNeeded()
// .withMode(CreateMode.PERSISTENT)
// .withACL(acls, true)
// .forPath(nodePath, data);
cto.client.setACL().withACL(acls).forPath("/curatorNode");
// 更新節點資料
// byte[] newData = "batman".getBytes();
// cto.client.setData().withVersion(0).forPath(nodePath, newData);
// 刪除節點
// cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);
// 讀取節點資料
// Stat stat = new Stat();
// byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
// System.out.println("節點" + nodePath + "的資料為: " + new String(data));
// System.out.println("該節點的版本號為: " + stat.getVersion());
cto.closeZKClient();
boolean isZkCuratorStarted2 = cto.client.isStarted();
System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
}
}