1. 程式人生 > >基於Curator操作ZooKeeper(二)-Watcher操作

基於Curator操作ZooKeeper(二)-Watcher操作

Java原生API操作ZooKeeper可參看:

Java原生API操作Zookeeper(一)

Java原生API操作Zookeeper(二)

相關內容:

基於Curator操作ZooKeeper(一)-基本操作

基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache

usingWatcher()方法

ZooKeeper Watcher監聽機制(資料變更的通知)(一)(應用)中介紹過,繫結事件只有三個操作:getData、exists、getChildren。

這個方法有兩個過載的方法,實現這兩個介面其實都差不多:

使用usingWatcher()方法監聽只會觸發一次,監聽完畢後就會銷燬。

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/a";
        //建立節點
       /* byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空*/

        //獲取節點
       /* byte[] bytes = cto.client.getData().forPath(nodePath);
        System.out.println("第一次獲取節點資料為:"+new String(bytes));

        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("第二次獲取節點資料為:"+new String(bytes1));
        System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));
*/

       cto.client.getData().usingWatcher((CuratorWatcher) event -> {
		   System.out.println("觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
	   }).forPath(nodePath);
        //獲取子節點
      /*  List<String> list = cto.client.getChildren().forPath(nodePath);
        System.out.println("開始列印子節點:");
        list.forEach(result-> System.out.println(result));
        System.out.println("列印結束!");*/

        //修改節點
      /*  Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
        System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));

        Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
        System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/

        //刪除節點
       /* Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("獲取節點資料為:"+new String(bytes1));
        cto.client.delete()
                .guaranteed()  //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功
                .deletingChildrenIfNeeded()  //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除
                .withVersion(stat.getVersion())
                .forPath(nodePath);*/

        Thread.sleep(300000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

執行程式後,在客戶端執行:

可以看到控制檯輸出了,可以看出沒有輸出namespace:

但是再在客戶端執行相同的操作,控制檯沒有輸出,說明事件只會觸發一次。

ZooKeeper原生支援通過註冊Watcher來進行事件監聽,但是其使用並不是特別方便,需要開發人員自己反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實可以近似看作是一個本地快取檢視和遠端ZooKeeper檢視的對比過程。同時Curator能夠自動為開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程。Cache分為兩類監聽型別:節點監聽和子節點監聽。

NodeCache的使用

NodeCache有兩個建構函式:

//構造NodeCache例項
		NodeCache nodeCache = new NodeCache(cto.client,nodePath);
		//建立Cache
		//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。一般在開發中我們會設定為true。
		nodeCache.start();

通過程式碼測試看看:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorOperator2 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	public CuratorOperator2() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		client.start();
//client.start(true);
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator2 cto = new CuratorOperator2();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/a";

       cto.client.getData().usingWatcher((CuratorWatcher) event -> {
		   System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
	   }).forPath(nodePath);

       //NodeCache:監聽資料節點的變更,會觸發事件

		//構造NodeCache例項
		NodeCache nodeCache = new NodeCache(cto.client,nodePath);
		//建立Cache
		//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
		nodeCache.start();
		if(nodeCache.getCurrentData()!=null){
			System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
		}else {
			System.out.println("節點資料為空!");
		}

		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

執行結果:

測試事件觸發:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorOperator2 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";

	public CuratorOperator2() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator2 cto = new CuratorOperator2();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/a";

       cto.client.getData().usingWatcher((CuratorWatcher) event -> {
		   System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
	   }).forPath(nodePath);

       //NodeCache:監聽資料節點的變更,會觸發事件

		//構造NodeCache例項
		NodeCache nodeCache = new NodeCache(cto.client,nodePath);
		//建立Cache
		//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
		nodeCache.start(true);
		if(nodeCache.getCurrentData()!=null){
			System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
		}else {
			System.out.println("節點資料為空!");
		}

		//新增事件(也有remove),還可以知道Excutor
		nodeCache.getListenable().addListener(() -> {
			String data = new String(nodeCache.getCurrentData().getData());
			System.out.println("節點路徑:"+nodeCache.getCurrentData().getPath()+",節點資料為:"+data);
		});

		System.in.read();
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

執行後在客戶端操作:

控制檯輸出:

如果在客戶端執行刪除操作:

控制檯輸出,空指標是因為節點被刪除了:

NodeCache不僅可以用於監聽資料節點的內容變更,也能監聽指定節點是否存在。如果原本節點不存在,那麼Cache就會在節點被建立後觸發NodeCacheListenera。但是,如果該資料節點被刪除,那麼Curator就無法觸發NodeCacheListener了(後面版本已經修復了這個問題,網上有的資料這麼說是有問題的)。

程式碼測試:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

import java.time.LocalDateTime;

public class CuratorOperator2 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";

	public CuratorOperator2() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator2 cto = new CuratorOperator2();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/a";

     /*  cto.client.getData().usingWatcher((CuratorWatcher) event -> {
		   System.out.println("【使用usingWatcher】觸發了watcher事件,節點路徑為:"+event.getPath()+",事件型別為:"+event.getType());
	   }).forPath(nodePath);*/

       //NodeCache:監聽資料節點的變更,會觸發事件

		//構造NodeCache例項
		NodeCache nodeCache = new NodeCache(cto.client,nodePath);
		//建立Cache
		//該方法有個boolean型別的引數,預設是false,如果設定為true,那麼NodeCache在第一次啟動的時候就會立刻從ZooKeeper上讀取對應節點的資料內容,並儲存在Cache中。
		nodeCache.start(true);
		if(nodeCache.getCurrentData()!=null){
			System.out.println("節點初始化資料為:"+new String(nodeCache.getCurrentData().getData()));
		}else {
			System.out.println("節點資料為空!");
		}

		//新增事件(也有remove)
		nodeCache.getListenable().addListener(() -> {
			/*String data = new String(nodeCache.getCurrentData().getData());
			System.out.println("節點路徑:"+nodeCache.getCurrentData().getPath()+",節點資料為:"+data);*/
			System.out.println(LocalDateTime.now()+"  |觸發節點事件!!!");
		});

		System.out.println("開始建立節點!");
		//建立節點
		cto.client.create()
				.creatingParentContainersIfNeeded()   //遞迴建立節點
				.withMode(CreateMode.PERSISTENT)      //節點模式
				.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
				.forPath(nodePath,"new Time".getBytes());

		System.in.read();
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

控制檯輸出:

在客戶端操作:

控制檯輸出:

PathChildrenCache

有時候我們需要監聽某一個節點具體的操作情況,這時候PathChildrenCache就派上用場了。PathChildrenCache用於監聽指定ZooKeeper資料節點的子節點變化情況。我們要監聽一個節點也沒有被刪除、新增、修改,我們只需要監聽這個節點的父節點即可,即監聽這個父節點以下所有的子節點,當某一個子節點發生了增刪改操作的時候都會被監聽到。

跟NodeCache類似,PathChildrenCache也需要呼叫start()來初始化:

資料準備:

同步非同步測試:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;

public class CuratorOperator3 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";

	public CuratorOperator3() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator3 cto = new CuratorOperator3();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

		String nodePath = "/dongguabai/a";
		//為子節點新增watcher
		PathChildrenCache childrenCache = new PathChildrenCache(cto.client,nodePath,true);
		/**
		 * StartMode:初始化方式
		 *  NORMAL:普通非同步初始化
		    BUILD_INITIAL_CACHE:同步初始化
		    POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件
		 */
		childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
//		childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//		childrenCache.start(PathChildrenCache.StartMode.NORMAL);
		List<ChildData> list = childrenCache.getCurrentData();
		System.out.println("獲取子節點列表:");
		//如果是BUILD_INITIAL_CACHE可以獲取這個資料,如果不是就不行
		list.forEach(childData -> {
			System.out.println(new String(childData.getData()));
		});


		System.in.read();
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

註冊事件測試:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicInteger;

public class CuratorOperator3 {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.136:2181,192.168.220.137:2181";

	public CuratorOperator3() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		client.start();
	}
	
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator3 cto = new CuratorOperator3();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

		String nodePath = "/dongguabai/a";
		//為子節點新增watcher
		PathChildrenCache childrenCache = new PathChildrenCache(cto.client,nodePath,true);
		/**
		 * StartMode:初始化方式
		 *  NORMAL:普通非同步初始化
		    BUILD_INITIAL_CACHE:同步初始化
		    POST_INITIALIZED_EVENT:非同步初始化,初始化之後會觸發事件,而且所有的子節點的add操作都會來一遍這個也是比較坑的地方
		 */
//		childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
		childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//		childrenCache.start(PathChildrenCache.StartMode.NORMAL);
		/*List<ChildData> list = childrenCache.getCurrentData();
		System.out.println("獲取子節點列表:");
		//如果是BUILD_INITIAL_CACHE可以獲取這個資料,如果不是就不行
		list.forEach(childData -> {
			System.out.println(new String(childData.getData()));
		});*/

		//註冊事件,也可以加Excutor

		/**
		 * 當指定節點的子節點發生變化時,就會回撥該方法oPathChildrenCacheEvent
		   類中定義了所有的事件型別,主要包括新增子節點(CHILD_ADDED)、子節點資料
		   變更(CHILD_UPDATED)和子節點刪除(CHILD_REMOVED)三類。
		 */
		AtomicInteger atomicInteger = new AtomicInteger(0);
		childrenCache.getListenable().addListener(((client1, event) -> {
			atomicInteger.getAndIncrement();
			System.out.println("-----  "+LocalDateTime.now()+"  "+event.getType());
			if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
				System.out.println("子節點初始化成功...");
			}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
				String path = event.getData().getPath();
				System.out.println("新增子節點:" + event.getData().getPath());
				System.out.println("子節點資料:" + new String(event.getData().getData()));
			}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
				System.out.println("刪除子節點:" + event.getData().getPath());
			}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
				System.out.println("修改子節點路徑:" + event.getData().getPath());
				System.out.println("修改子節點資料:" + new String(event.getData().getData()));
			}

			//如果想指定節點可以判斷路徑
			/*String path = event.getData().getPath();
			if (path.equals(ADD_PATH)) {
				System.out.println("新增子節點:" + event.getData().getPath());
				System.out.println("子節點資料:" + new String(event.getData().getData()));
			} else if (path.equals("/super/imooc/e")) {
				System.out.println("新增不正確...");
			}*/
		}));

		Thread.sleep(1000);
		System.out.println("結果:"+atomicInteger);
		System.in.read();
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

啟動專案,先看控制檯:

在客戶端執行一些操作:

再看控制檯: