zookeeper配置程式碼實現
阿新 • • 發佈:2022-04-22
package com.msb.zk.ZkTest; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; /** * @author lcc * @version V1.0 * @Package com.msb.zk.ZkTest * @date 2022/4/22 10:30 */ /* 獲得zk */ public class ZkUtils { private static ZooKeeper zk; private static String address="192.168.1.136:2181,192.168.1.137:2181,192.168.1.138:2181,192.168.1.139:2181/testConf"; private static DefaultWatch watch=new DefaultWatch(); private static CountDownLatch init= new CountDownLatch(1);//這裡定義計數器 是為了防止直接return zk 因為還沒有去連結zk 就直接返回了 //什麼時候去減? 再connected 之後再去減 public static ZooKeeper getZk(){ //這裡new zk 的時候需要傳入一個watch try { zk = newZooKeeper(address,1000,watch); //這裡還有一個非同步的操作 是去連結 建立session 的統一檢視的過程 //在減之前設定 將init 穿過去讓defaultwatch 執行--操作 watch.setCc(init); init.await();//阻塞等待 CountDownLatch為0 可用了 再執行return System.out.println("阻塞解除,已成功連結"); } catch (Exception e) { e.printStackTrace(); }return zk; } }
package com.msb.zk.ZkTest; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @author lcc * @version V1.0 * @Package com.msb.zk.ZkTest * @date 2022/4/22 10:59 */ public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { //因為 判斷是否存在時需要watcher 和statCallback 而getData 需要Watcher 和dataCallBack ZooKeeper zk; MyConf conf; CountDownLatch cc= new CountDownLatch(1); public MyConf getConf() { return conf; } public void setConf(MyConf conf) { this.conf = conf; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public void aWait(){ //如果就這樣的話還是 直接非同步 就結束了 獲取不到值 需要阻塞 就需要引入CountDownLatch zk.exists("/AppConf", this,this , "oldData"); try { cc.await();//等待 資料返回 節點資料存在 且資料取完了 } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void processResult(int i, String s, Object o, byte[] data, Stat stat) { //判斷存在後再執行獲取資料 if(data!=null){ //取到資料了 String s1 = new String(data); //將從zk獲取到的data 資料 轉換成字串 然後通過conf類接受 kafka 傳過來的資料 conf.setConf(s1);//說明取到資料了 cc.countDown();//這裡可以釋放執行緒阻塞 } } @Override public void processResult(int i, String s, Object o, Stat stat) { //狀態 判斷是否存在 if(stat!=null){ //如果狀態不為空說明 有資料 那麼就可以取資料 zk.getData("/AppConf", this, this, "AAA"); } } @Override public void process(WatchedEvent watchedEvent) { //節點發生變化的環節 修改 switch (watchedEvent.getType()) { case None: break; case NodeCreated: System.out.println("節點資料建立"); zk.getData("/AppConf", this, this, "AAA"); break; case NodeDeleted: System.out.println("節點資料被刪除"); conf.setConf(""); //當刪除時 設定conf 內容是空 cc=new CountDownLatch(1); //這裡重新設定計數器為1 等待檔案被建立後減一之後 釋放阻塞執行緒 break; case NodeDataChanged: System.out.println("節點資料變更"); zk.getData("/AppConf", this, this, "AAA"); break; case NodeChildrenChanged: break; } } }
package com.msb.zk.ZkTest; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; /** * @author lcc * @version V1.0 * @Package com.msb.zk.ZkTest * @date 2022/4/22 10:36 */ public class DefaultWatch implements Watcher { CountDownLatch cc; //這裡引用CountDownLatch 是為了連結成功之後減一 就可以方形那邊的阻塞await方法 public void setCc(CountDownLatch cc) { this.cc = cc; } //這個default watch 是建立zk 使用的watch 與session 有關 @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); //這裡是watch 監聽zk 的事件 switch (watchedEvent.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: //在connected 之後才能執行減1 操作 cc.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } } }
package com.msb.zk.ZkTest; /** * @author lcc * @version V1.0 * @Package com.msb.zk.ZkTest * @date 2022/4/22 11:12 */ /*這個類才是你最關心的地方 這裡可能不止是放入的是字串 可能是json 可能是xml * */ public class MyConf { private String conf; public String getConf() { return conf; } public void setConf(String conf) { this.conf = conf; } }
package com.msb.zk.ZkTest; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * @author lcc * @version V1.0 * @Package com.msb.zk.ZkTest * @date 2022/4/22 10:29 */ /* * 獲得zk * 使用zk * 斷開zk 測試 */ public class TestConfig { ZooKeeper zk; @Before public void conn(){ //先執行這裡的方法 設定zk zk = ZkUtils.getZk(); } @After public void close(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void getConf() throws InterruptedException { //怎麼去取? 目錄是否存在 先判斷目錄是否存在 如果存在 一定會執行回撥方法 那麼就可以執行取得操作 //因為我們在判斷是否存在的時候 需要寫兩個watcher 和 callback 都是匿名內部類 而且 有資料後 執行取操作時 zk.getData 還有匿名內部類 //所以用一個工具類替代 簡化程式碼 WatchCallBack watchCallBack=new WatchCallBack(); //將這裡的zk 傳過去 watchCallBack.setZk(zk); //因為這是非同步的方式 這裡 不會阻塞 不會的等待callback返回的值 繼續向下走 //另外一個 watchcall back 的獲取到的資料怎麼讓這個執行緒知道 那麼我們需要建立一個接受資料的類 MyConf myConf = new MyConf(); //需要讓WatchCallBack 接受資料 watchCallBack.setConf(myConf); //如果myconf 資料更新或者修改了 watchCallBack能取到結果 能獲取到kafka傳遞過來的結果 //注意zk.exists 是非同步的 如果不阻塞 那麼會執行下面的方法 獲取的值是空 //只有執行了callBack裡面獲取到值了才能繼續向下走 watchCallBack.aWait();//這裡我們將方法定義在watchCallBack裡面的方法裡 如果獲得到資料 往下走 如果還沒獲取到資料 就在這裡阻塞著 /*1節點不存在 雖然會執行getstat 但是狀態為空 無法取到資料 那麼就會一直阻塞 一直監聽是否存在資料 //一旦建立一個節點 該節點會呼叫getData 裡面的 watcher 和callback 就會有狀態有資料了 執行緒不阻塞了 2節點存在 * */ while (true){ if(myConf.getConf().equals("")){ System.out.println("conf diu le "); watchCallBack.aWait();//這裡如果conf 內容為空說明檔案被刪除 那麼久應該阻塞,等待檔案建立 }else { System.out.println(myConf.getConf());//將watchCallBack裡面的資料取出來 } Thread.sleep(200); } } }