1. 程式人生 > 其它 >zookeeper配置程式碼實現

zookeeper配置程式碼實現

package com.msb.zk.ZkTest;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
 * @author lcc
 * @version V1.0
 * @Package com.msb.zk.ZkTest
 * @date 2022/4/22 10:30
 */
/*
獲得zk
*/
public class ZkUtils {
    private static ZooKeeper zk;
    private static String address="192.168.1.136:2181,192.168.1.137:2181,192.168.1.138:2181,192.168.1.139:2181/testConf
"; private static DefaultWatch watch=new DefaultWatch(); private static CountDownLatch init= new CountDownLatch(1);//這裡定義計數器 是為了防止直接return zk 因為還沒有去連結zk 就直接返回了 //什麼時候去減? 再connected 之後再去減 public static ZooKeeper getZk(){ //這裡new zk 的時候需要傳入一個watch try { zk = new
ZooKeeper(address,1000,watch); //這裡還有一個非同步的操作 是去連結 建立session 的統一檢視的過程 //在減之前設定 將init 穿過去讓defaultwatch 執行--操作 watch.setCc(init); init.await();//阻塞等待 CountDownLatch為0 可用了 再執行return System.out.println("阻塞解除,已成功連結"); } catch (Exception e) { e.printStackTrace(); }
return zk; } }
package com.msb.zk.ZkTest;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

/**
 * @author lcc
 * @version V1.0
 * @Package com.msb.zk.ZkTest
 * @date 2022/4/22 10:59
 */
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
    //因為 判斷是否存在時需要watcher 和statCallback   而getData 需要Watcher 和dataCallBack
    ZooKeeper zk;
    MyConf conf;
    CountDownLatch cc= new CountDownLatch(1);

    public MyConf getConf() {
        return conf;
    }

    public void setConf(MyConf conf) {
        this.conf = conf;
    }

    public ZooKeeper getZk() {
        return zk;
    }

    public void setZk(ZooKeeper zk) {
        this.zk = zk;
    }


    public  void aWait(){
        //如果就這樣的話還是 直接非同步  就結束了 獲取不到值 需要阻塞 就需要引入CountDownLatch
        zk.exists("/AppConf", this,this , "oldData");
        try {
            cc.await();//等待 資料返回  節點資料存在 且資料取完了
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void processResult(int i, String s, Object o, byte[] data, Stat stat) {
        //判斷存在後再執行獲取資料
        if(data!=null){
            //取到資料了
            String s1 = new String(data);
            //將從zk獲取到的data 資料 轉換成字串 然後通過conf類接受 kafka 傳過來的資料
            conf.setConf(s1);//說明取到資料了
            cc.countDown();//這裡可以釋放執行緒阻塞
        }
    }

    @Override
    public void processResult(int i, String s, Object o, Stat stat) {
        //狀態  判斷是否存在
        if(stat!=null){
            //如果狀態不為空說明 有資料 那麼就可以取資料
            zk.getData("/AppConf", this, this, "AAA");
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        //節點發生變化的環節 修改
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
                System.out.println("節點資料建立");
                zk.getData("/AppConf", this, this, "AAA");
                break;
            case NodeDeleted:
                System.out.println("節點資料被刪除");
               conf.setConf("");
               //當刪除時 設定conf 內容是空
               cc=new CountDownLatch(1);
               //這裡重新設定計數器為1 等待檔案被建立後減一之後 釋放阻塞執行緒
                break;
            case NodeDataChanged:
                System.out.println("節點資料變更");
                zk.getData("/AppConf", this, this, "AAA");
                break;
            case NodeChildrenChanged:
                break;
        }


    }



}
package com.msb.zk.ZkTest;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

/**
 * @author lcc
 * @version V1.0
 * @Package com.msb.zk.ZkTest
 * @date 2022/4/22 10:36
 */
public class DefaultWatch implements Watcher {

    CountDownLatch cc;
    //這裡引用CountDownLatch 是為了連結成功之後減一 就可以方形那邊的阻塞await方法

    public void setCc(CountDownLatch cc) {
        this.cc = cc;
    }

    //這個default watch  是建立zk 使用的watch  與session 有關
    @Override
    public void process(WatchedEvent watchedEvent) {

        System.out.println(watchedEvent.toString());
        //這裡是watch 監聽zk 的事件
        switch (watchedEvent.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                //在connected 之後才能執行減1 操作
                cc.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }



    }
}
package com.msb.zk.ZkTest;

/**
 * @author lcc
 * @version V1.0
 * @Package com.msb.zk.ZkTest
 * @date 2022/4/22 11:12
 */
/*這個類才是你最關心的地方
這裡可能不止是放入的是字串 可能是json  可能是xml
* */
public class MyConf {


    private String conf;

    public String getConf() {
        return conf;
    }

    public void setConf(String conf) {
        this.conf = conf;
    }
}
package com.msb.zk.ZkTest;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author lcc
 * @version V1.0
 * @Package com.msb.zk.ZkTest
 * @date 2022/4/22 10:29
 */

/*
* 獲得zk
* 使用zk
* 斷開zk

測試
 */

public class TestConfig {
    ZooKeeper zk;


    @Before
    public  void conn(){

        //先執行這裡的方法  設定zk
         zk = ZkUtils.getZk();


    }



    @After
    public void close(){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    @Test
    public void getConf() throws InterruptedException {
        //怎麼去取?  目錄是否存在  先判斷目錄是否存在  如果存在  一定會執行回撥方法  那麼就可以執行取得操作
        //因為我們在判斷是否存在的時候 需要寫兩個watcher  和 callback 都是匿名內部類 而且 有資料後 執行取操作時 zk.getData 還有匿名內部類
        //所以用一個工具類替代  簡化程式碼
        WatchCallBack watchCallBack=new WatchCallBack();
        //將這裡的zk 傳過去
        watchCallBack.setZk(zk);
        //因為這是非同步的方式  這裡 不會阻塞 不會的等待callback返回的值 繼續向下走
        //另外一個 watchcall back 的獲取到的資料怎麼讓這個執行緒知道   那麼我們需要建立一個接受資料的類
        MyConf myConf = new MyConf();
        //需要讓WatchCallBack 接受資料
        watchCallBack.setConf(myConf);
        //如果myconf 資料更新或者修改了 watchCallBack能取到結果  能獲取到kafka傳遞過來的結果

        //注意zk.exists 是非同步的 如果不阻塞 那麼會執行下面的方法 獲取的值是空
        //只有執行了callBack裡面獲取到值了才能繼續向下走
        watchCallBack.aWait();//這裡我們將方法定義在watchCallBack裡面的方法裡  如果獲得到資料 往下走 如果還沒獲取到資料 就在這裡阻塞著

        /*1節點不存在
        雖然會執行getstat 但是狀態為空  無法取到資料  那麼就會一直阻塞 一直監聽是否存在資料
        //一旦建立一個節點 該節點會呼叫getData 裡面的 watcher 和callback
        就會有狀態有資料了 執行緒不阻塞了
        2節點存在
        * */
        while (true){

            if(myConf.getConf().equals("")){
                System.out.println("conf diu le ");
                watchCallBack.aWait();//這裡如果conf 內容為空說明檔案被刪除 那麼久應該阻塞,等待檔案建立
            }else {
                System.out.println(myConf.getConf());//將watchCallBack裡面的資料取出來
            }
            Thread.sleep(200);
        }


    }

}