1. 程式人生 > >第7章 Apache Curator客戶端的使用

第7章 Apache Curator客戶端的使用

Apache Curator客戶端的使用

7-1 curator簡介與客戶端之間的異同點

在這裡插入圖片描述


在這裡插入圖片描述


在這裡插入圖片描述


在這裡插入圖片描述


7-2 搭建maven工程,建立curator與zkserver的連線

這個是需要引入的依賴:

		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>4.0.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>4.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.11</version>
		</dependency>

前面兩種是比較推薦的:
當構建好了RetryPolicy物件了之後,就可以用建造者模式來進行構建客戶端:

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.1.110:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的策略:ExponentialBackoffRetry
		 * baseSleepTimeMs:初始sleep的時間
		 * maxRetries:最大重試次數
		 * maxSleepMs:最大重試時間
		 */
// RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); /** * curator連結zookeeper的策略:RetryNTimes * n:重試的次數 * sleepMsBetweenRetries:每次重試間隔的時間 */ RetryPolicy retryPolicy = new RetryNTimes(3, 5000); /** * curator連結zookeeper的策略:RetryOneTime * sleepMsBetweenRetry:每次重試間隔的時間 */
// RetryPolicy retryPolicy2 = new RetryOneTime(3000); /** * 永遠重試,不推薦使用 */ // RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs) /** * curator連結zookeeper的策略:RetryUntilElapsed * maxElapsedTimeMs:最大重試時間 * sleepMsBetweenRetries:每次重試間隔 * 重試時間超過maxElapsedTimeMs後,就不再重試 */ // RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000); client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); client.start(); } /** * * @Description: 關閉zk客戶端連線 */ public void closeZKClient() { if (client != null) { this.client.close(); } } public static void main(String[] args) throws Exception { // 例項化 CuratorOperator cto = new CuratorOperator(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉")); // 建立節點 String nodePath = "/super/imooc"; // byte[] data = "superme".getBytes(); // cto.client.create().creatingParentsIfNeeded() // .withMode(CreateMode.PERSISTENT) // .withACL(Ids.OPEN_ACL_UNSAFE) // .forPath(nodePath, data); // 更新節點資料 // byte[] newData = "batman".getBytes(); // cto.client.setData().withVersion(0).forPath(nodePath, newData); // 刪除節點 // cto.client.delete() // .guaranteed() // 如果刪除失敗,那麼在後端還是繼續會刪除,直到成功 // .deletingChildrenIfNeeded() // 如果有子節點,就刪除 // .withVersion(0) // .forPath(nodePath); // 讀取節點資料 // Stat stat = new Stat(); // byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath); // System.out.println("節點" + nodePath + "的資料為: " + new String(data)); // System.out.println("該節點的版本號為: " + stat.getVersion()); // 查詢子節點 // List<String> childNodes = cto.client.getChildren() // .forPath(nodePath); // System.out.println("開始列印子節點:"); // for (String s : childNodes) { // System.out.println(s); // } // 判斷節點是否存在,如果不存在則為空 // Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc"); // System.out.println(statExist); // watcher 事件 當使用usingWatcher的時候,監聽只會觸發一次,監聽完畢後就銷燬 // cto.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); // cto.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath); // 為節點新增watcher // NodeCache: 監聽資料節點的變更,會觸發事件 // final NodeCache nodeCache = new NodeCache(cto.client, nodePath); // // buildInitial : 初始化的時候獲取node的值並且快取 // nodeCache.start(true); // if (nodeCache.getCurrentData() != null) { // System.out.println("節點初始化資料為:" + new String(nodeCache.getCurrentData().getData())); // } else { // System.out.println("節點初始化資料為空..."); // } // nodeCache.getListenable().addListener(new NodeCacheListener() { // public void nodeChanged() throws Exception { // if (nodeCache.getCurrentData() == null) { // System.out.println("空"); // return; // } // String data = new String(nodeCache.getCurrentData().getData()); // System.out.println("節點路徑:" + nodeCache.getCurrentData().getPath() + "資料:" + data); // } // }); // 為子節點新增watcher // PathChildrenCache: 監聽資料節點的增刪改,會觸發事件 String childNodePathCache = nodePath; // cacheData: 設定快取節點的資料狀態 final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true); /** * StartMode: 初始化方式 * POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件 * NORMAL:非同步初始化 * BUILD_INITIAL_CACHE:同步初始化 */ childrenCache.start(StartMode.POST_INITIALIZED_EVENT); List<ChildData> childDataList = childrenCache.getCurrentData(); System.out.println("當前資料節點的子節點資料列表:"); for (ChildData cd : childDataList) { String childData = new String(cd.getData()); System.out.println(childData); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ System.out.println("子節點初始化ok..."); } else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ String path = event.getData().getPath(); if (path.equals(ADD_PATH)) { System.out.println("新增子節點:" + event.getData().getPath()); System.out.println("子節點資料:" + new String(event.getData().getData())); } else if (path.equals("/super/imooc/e")) { System.out.println("新增不正確..."); } }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ System.out.println("刪除子節點:" + event.getData().getPath()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("修改子節點路徑:" + event.getData().getPath()); System.out.println("修改子節點資料:" + new String(event.getData().getData())); } } }); Thread.sleep(100000); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉")); } public final static String ADD_PATH = "/super/imooc/d"; }

7-3 zk名稱空間以及建立節點

	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
		// 建立節點
		String nodePath = "/super/ldc";
		byte[] data = "superme".getBytes();
		cto.client.create().creatingParentsIfNeeded()
			.withMode(CreateMode.PERSISTENT)
			.withACL(Ids.OPEN_ACL_UNSAFE)
			.forPath(nodePath, data);
	}		

7-9 zk-watcher例項 統一更新N臺節點的配置檔案

在這裡插入圖片描述
有3個客戶端類:

public class Client1 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.1.110:2181";

	public Client1() {
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		client = CuratorFrameworkFactory.builder()
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
//	public final static String CONFIG_NODE = "/super/imooc/redis-config";
	public final static String CONFIG_NODE_PATH = "/super/imooc";
	public final static String SUB_PATH = "/redis-config";
	public static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
		Client1 cto = new Client1();
		System.out.println("client1 啟動成功...");
		
		final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);
		childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
		
		// 新增監聽事件
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				// 監聽節點變化
				if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					String configNodePath = event.getData().getPath();
					if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {
						System.out.println("監聽到配置發生變化,節點路徑為:" + configNodePath);
						
						// 讀取節點資料
						String jsonConfig = new String(event.getData().getData());
						System.out.println("節點" + CONFIG_NODE_PATH + "的資料為: " + jsonConfig);
						
						// 從json轉換配置
						RedisConfig redisConfig = null;
						if (StringUtils.isNotBlank(jsonConfig)) {
							redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);
						}
						
						// 配置不為空則進行相應操作
						if (redisConfig != null) {
							String type = redisConfig.getType();
							String url = redisConfig.getUrl();
							String remark = redisConfig.getRemark();
							// 判斷事件
							if (type.equals("add")) {
								System.out.println("監聽到新增的配置,準備下載...");
								// ... 連線ftp伺服器,根據url找到相應的配置
								Thread.sleep(500);
								System.out.println("開始下載新的配置檔案,下載路徑為<" + url + ">");
								// ... 下載配置到你指定的目錄
								Thread.sleep(1000);
								System.out.println("下載成功,已經新增到專案中");
								// ... 拷貝檔案到專案目錄
							} else if (type.equals("update")) {
								System.out.println("監聽到更新的配置,準備下載...");
								// ... 連線ftp伺服器,根據url找到相應的配置
								Thread.sleep(500);
								System.out.println("開始下載配置檔案,下載路徑為<" + url + ">");
								// ... 下載配置到你指定的目錄
								Thread.sleep(1000);
								System.out.println("下載成功...");
								System.out.println("刪除專案中原配置檔案...");
								Thread.sleep(100);
								// ... 刪除原檔案
								System.out.println("拷貝配置檔案到專案目錄...");
								// ... 拷貝檔案到專案目錄
							} else if (type.equals("delete")) {
								System.out.println("監聽到需要刪除配置");
								System.out.println("刪除專案中原配置檔案...");
							}
							
							// TODO 視情況統一重啟服務
						}
					}
				}
			}
		});
		
		countDown.await();
		
		cto.closeZKClient();
	}
	
}

7-10 curator之acl許可權操作與認證授權

public class CuratorAcl {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.1.110:2181";

	public CuratorAcl() {
		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		client = CuratorFrameworkFactory.builder().authorization("digest", "imooc1:123456".getBytes())
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("workspace").build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorAcl cto = new CuratorAcl();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
		
		String nodePath = "/acl/father/child/sub";
		
		List<ACL> acls = new ArrayList<ACL>();
		Id imooc1 = new Id("digest", AclUtils.getDigestUserPwd("imooc1:123456"));
		Id imooc2 = new Id("digest", AclUtils.getDigestUserPwd("imooc2:123456"));
		acls.add(new ACL(Perms.ALL, imooc1));
		acls.add(new ACL(Perms.READ, imooc2));
		acls.add(new ACL(Perms.DELETE | Perms.CREATE, imooc2));
		
		// 建立節點
//		byte[] data = "spiderman".getBytes();
//		cto.client.create().creatingParentsIfNeeded()
//				.withMode(CreateMode.PERSISTENT)
//				.withACL(acls, true)
//				.forPath(nodePath, data);
		

		cto.client.setACL().withACL(acls).forPath("/curatorNode");
		
		// 更新節點資料
//		byte[] newData = "batman".getBytes();
//		cto.client.setData().withVersion(0).forPath(nodePath, newData);
		
		// 刪除節點
//		cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);
		
		// 讀取節點資料
//		Stat stat = new Stat();
//		byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
//		System.out.println("節點" + nodePath + "的資料為: " + new String(data));
//		System.out.println("該節點的版本號為: " + stat.getVersion());
		
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}