1. 程式人生 > >ZooKeeper=Util 重複註冊監聽節點

ZooKeeper=Util 重複註冊監聽節點

package com.easou.noveladmin.utils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.commons.lang.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.easou.noveladmin.model.ApiAppUpdateCodeCh; import com.easou.noveladmin.model.ApiConstants; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; public class ApiUtil { private static Logger logger = LoggerFactory.getLogger("zookeeperLogger"); private static CountDownLatch connectedLatch = new CountDownLatch(1); private static Watcher watcher = new ConnectedWatcher(connectedLatch); public static ZooKeeper zk = null; //public static String rootPath="/appUpdate"; public static boolean isInit_appUpdate=false; public static boolean isInit_appFound=false; static{ }   static class ConnectedWatcher implements Watcher {            private CountDownLatch connectedLatch;            ConnectedWatcher(CountDownLatch connectedLatch) {                this.connectedLatch = connectedLatch;            }            @Override            public void process(WatchedEvent event) {               if (event.getState() == KeeperState.SyncConnected) {                   connectedLatch.countDown();               }            }      }   public static Watcher wh =null;  /* public static Watcher wh = new Watcher() {    public void process(org.apache.zookeeper.WatchedEvent event) {     try {      String path = event.getPath();      EventType type = event.getType();      if(type.getIntValue()==3){       logger.info("######################  "+"資料改變"+ "路徑" + path + " 型別:" + event.getType());      }else if (type.getIntValue()==4) {       logger.info("######################  "+"子節點改變"+ "路徑" + path + " 型別:" + event.getType());       if(StringUtils.isNotBlank(path)){        logger.info("******************新增子節點,重新監聽路徑"+path);        addWatcher(zk, path);       }      }      if (StringUtils.isNotBlank(event.getPath())) {       logger.info("######################  "+"主動監聽:"+path);      }      if(StringUtils.isNotBlank(path)){       zk.getData(path, wh, null);      }else{       zk.getData(rootPath, wh, null);      }     } catch (Exception e) {      logger.error("獲取Wach失敗:",e.getMessage());     }    }   };*/   public static  Watcher initWatch(final String defaultRootPath){     wh  = new Watcher() {     public void process(org.apache.zookeeper.WatchedEvent event) {      try {       String path = event.getPath();       EventType type = event.getType();       if(type.getIntValue()==3){        logger.info("######################  "+"資料改變"+ "路徑" + path + " 型別:" + event.getType());       }else if (type.getIntValue()==4) {        logger.info("######################  "+"子節點改變"+ "路徑" + path + " 型別:" + event.getType());        if(StringUtils.isNotBlank(path)){         logger.info("******************新增子節點,重新監聽路徑"+path);         addWatcher(zk, path);        }       }       if (StringUtils.isNotBlank(event.getPath())) {        logger.info("######################  "+"主動監聽:"+path);       }       if(StringUtils.isNotBlank(path)){        zk.getData(path, initWatch(path), null);       }else{        zk.getData(defaultRootPath, initWatch(defaultRootPath), null);       }      } catch (Exception e) {       logger.error("獲取Wach失敗:",e.getMessage());      }     }    };    return wh;   }   public static void  addWatcher(ZooKeeper zke,final String path){    List<String> children =null;    try {      Watcher wh_c = wh;         children = zke.getChildren(path, wh_c);     if(children==null||children.size()<=0){      logger.info("主動監聽:"+path+"wh_c:"+wh_c);      zk.getData(path, wh_c, null);      return;     }else{      for (String str : children) {       if(StringUtils.isNotBlank(str)){        String pwd=path+"/"+str;        addWatcher(zke, pwd);       }      }     }     zk.getData(path, wh_c, null);     logger.info("主動監聽:父節點:"+path);    } catch (KeeperException e) {     // TODO Auto-generated catch block     e.printStackTrace();    } catch (InterruptedException e) {     // TODO Auto-generated catch block     e.printStackTrace();    }   }   public static String getValue(ZooKeeper zk,String key){    try {     long st = System.currentTimeMillis();     byte[] data = zk.getData(key, wh, null);     //byte[] data = zk.getData(key, false, null);     String val = new String(data, "UTF-8");     logger.info("獲取val用時:"+(System.currentTimeMillis()-st));     return val;    } catch (KeeperException e) {     e.printStackTrace();    } catch (InterruptedException e) {     e.printStackTrace();    }catch (UnsupportedEncodingException e) {     // TODO Auto-generated catch block     e.printStackTrace();    }    return "";   }   public static void setValue(ZooKeeper zk,String path,String value) throws Exception{    // 修改節點/root/childone下的資料,第三個引數為版本,如果是-1,那會無視被修改的資料版本,直接改掉     try {      long st = System.currentTimeMillis();      zk.setData(path,value.getBytes("UTF-8") , -1);      logger.info("設定val用時:"+(System.currentTimeMillis()-st));     } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();      throw new Exception(e);     }   }   /**    * 獲取zookeeper連線,如果連線未建立完成,則程式阻塞new ZooKeeper(ZKPURL, TIMEOUT, wh)    * @param zkpUrl    * @param timeout    * @return    */   public static ZooKeeper getZooKeeperConnection(String zkpUrl, int timeout,String rootPath) {    long st = System.currentTimeMillis();    ZooKeeper zooKeeper = null;    try {     zooKeeper = new ZooKeeper(zkpUrl, timeout, watcher);     if (States.CONNECTING == zooKeeper.getState()) {      if (logger.isDebugEnabled())       logger.debug("zookeeper連線狀態:" + zooKeeper.getState());      try {       connectedLatch.await();       if (logger.isDebugEnabled())        logger.debug("zookeeper連線狀態:" + zooKeeper.getState());      } catch (InterruptedException e) {       e.printStackTrace();      }     }     if(ApiConstants.API_APPFOUND_ROOTPATH.equals(rootPath)){      if(!isInit_appFound){       logger.info("沒有初始化....");          initWatch(rootPath);       zk = new ZooKeeper(zkpUrl, 300000, wh);       addWatcher(zk, rootPath);       isInit_appFound=true;      }else{       logger.info("已經初始化....");      }     }else{      if(!isInit_appUpdate){       logger.info("沒有初始化....");          initWatch(rootPath);       zk = new ZooKeeper(zkpUrl, 300000, wh);       addWatcher(zk, rootPath);       isInit_appUpdate=true;      }else{       logger.info("已經初始化....");      }     } // if(!isInit){ // logger.info("沒有初始化...."); //    initWatch(rootPath); // zk = new ZooKeeper(zkpUrl, 300000, wh); // addWatcher(zk, rootPath); // isInit=true; // }else{ // logger.info("已經初始化...."); // }     logger.info("獲取連線用時:"+(System.currentTimeMillis()-st));    } catch (IOException e) {     logger.error("獲取連線失敗:isInit:"+isInit_appFound+"  zooKeeper:"+zooKeeper,e.getMessage());     e.printStackTrace();    }    return zooKeeper;   } // public static void saveWach(String rootPath){ // if(!isInit){ // logger.info("沒有初始化...."); //    initWatch(rootPath); // zk = new ZooKeeper(zkpUrl, 300000, wh); // addWatcher(zk, rootPath); // isInit=true; // }else{ // logger.info("已經初始化...."); // } // }   public static void delteAllNode(ZooKeeper zk,String path){     try {      List<String> children = zk.getChildren(path,true);      if(children != null && children.size()>0){       for (String url : children) {        delteAllNode(zk, path+"/"+url);       }      }      zk.delete(path, -1);     logger.info("刪除節點:"+path);    } catch (KeeperException e) {     // TODO Auto-generated catch block     e.printStackTrace();    } catch (InterruptedException e) {     // TODO Auto-generated catch block     e.printStackTrace();    }   }   public static void main(String[] args) {    //test_addAppUpdate();    String ZKPURL = "120.197.138.35:2181,120.197.138.35:2182,120.197.138.35:2183";    int timeout=3000;    ZooKeeper zk = ApiUtil.getZooKeeperConnection(ZKPURL, timeout,"");    try {     System.out.println(new String(zk.getData("/appUpdate/online/ios/noUpdate_key", wh, null)));     //setValue(zk, "/appUpdate/online/ios/desc", "");     System.out.println(new String(zk.getData("/appUpdate/online/ios/desc", wh, null)));    String str="[{'ch':'123','code':'1012'},{'ch':'abc','code':'1542'}]";    //ApiAppUpdateBean bean = JSON.parseObject(str, new ArrayList()<ApiAppUpdateCodeCh>.class);     Gson gson=new Gson();     List<ApiAppUpdateCodeCh> list=gson.fromJson(str,new TypeToken<List<ApiAppUpdateCodeCh>>() {}.getType());     System.out.println(list);     String json = gson.toJson(list);     System.out.println(json);    } catch (KeeperException e) {     // TODO Auto-generated catch block     e.printStackTrace();    } catch (InterruptedException e) {     // TODO Auto-generated catch block     e.printStackTrace();    }   } }

相關推薦

ZooKeeper=Util 重複註冊節點

package com.easou.noveladmin.utils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.Iterator;

zookeeper 共享鎖問題—— 最小目錄節點時,並準備向監聽器列表中註冊監聽器時,最小目錄節點被刪除。

前言: zookeeper 分佈鎖的原來是 如圖:              使用zookeeper 來實現分散式鎖,發現一個問題,當客戶端A執行緒發出命令刪除最小的目錄lock_001被刪除時,並不會立刻刪除,因為命令是通過網路協議傳輸過去的,中間會產生一定的

zookeeper 永久節點 來保證叢集間一致性

先是封裝的 zkClientpublic class ZkClient { public Logger logger = LoggerFactory.getLogger(getClass()); public ZooKeeper zookeeper;

zookeeper內部機制與註冊機制

zookeeper應用:You can use it off-the-shelf to implement consensus, group management, leader election, and presence protocols. And

動態註冊

rod span desc -h pos acl row ora star SQL> select * from v$version where rownum=1; BANNER --------------------------------------

Android的廣播機制基礎1---動態註冊的使用,以獲得電池的使用狀態為例

以一個顯示手機電量和電池狀態的Demo為例。 1.要獲得電池的使用狀態,需在AndroidManifest.xml中新增使用許可權: <uses-permission android:name="android.permission.BATTERY_S

Oracle中靜態註冊的問題

問題描述:          用ASP.net基於oracle寫的BS web應用程式,出現一個困擾很久的問題,就是過一段時間就會莫名其妙地出現無法連線資料庫的問題。可是直接通過sqlplus本地連線又是可以的。只能是重啟oracle開頭的所有服務,連線成功。 查詢原因:

C++從零開始區塊鏈:P2P模組之節點註冊

ThreadPool是一個執行緒池,具體實現就不貼了,隨便找個執行緒池實現就行,也可以戳這裡檢視程式完整程式碼。 P2PNode::P2PNode(const char *if_name) { m_sock = socket(AF_INET, SOCK_DGRAM, 0);//I

Zookeeper伺服器節點動態上下線案例

需求: 某分散式系統中,主節點可以有多臺,可以動態上下線,任意一臺客戶端都能實時感知到主節點伺服器的上下線 服務端程式碼 package com.zyd.zook; import java.io.IOException; import org.apache.zookee

Springboot2(29)整合zookeeper的增刪改查、節點、分散式讀寫鎖、分散式計數器

原始碼地址 springboot2教程系列 實現zookeeper節點的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器 新增依賴 <properties> <project.build.sourceEncoding

利用ZooKeeper API模擬HDFS節點模式

首先我們需要理解一件事情,雖然大多數人學習ZooKeeper是因為Hadoop和大資料,但實際上ZK只是分散式一致性演算法的實現,和大資料以及Hadoop並無任何關係。 ZK本身是一套樹樁結構的檔案系統,這個系統每個檔案節點可以存放一點兒資料。重點是這個檔案系統十分敏感,

Zookeeper(2)---節點屬性、和許可權

之前通過客戶端連線之後我們已經知道了zk相關的很多命令(Zookeeper(1)---初識)。 節點屬性: 現在我們就通過stat指令來看看節點都有哪些屬性,或者使用get 指令和-s引數來檢視節點資料以及屬性   abc  節點資料 cZxid = 0x5 建立節

節點2上crsd無法啟動,數據庫和無法自動啟動,比如ocrconfig、ocrcheck以及srvct

oracle 數據庫 操作系統 信息 手工 CRSD進程在11g中的變化在11.2中,CRSD進程不再是RAC中最關鍵的進程之一。如果對10g RAC比較熟悉,應該清楚CRSD進程的重要性,Oracle在操作系統啟動後,就是通過啟動這個進程然後啟動整個CLUSTER以及數據庫的。在11.2

Oracle程序未啟動或數據庫服務未註冊到該

數據庫服務 pri ora iat lis oracle服務 建數據庫 ip地址 his oracle新建數據庫的時候提示Could not find appropriate listener for this database要做的操作如下: 1、查看netmanage

zookeeper源碼之配置

com continue 點數據 lis process 節點數 hashset ace tree   配置存儲不僅維護了一個樹結構,還對各個節點添加了變更監聽。 類圖   DataTree內部維護兩個通知管理器,分別監聽節點數據變更和子節點變更。 public cl

Curator之PathChildrenCache子節點

Curator之PathChildrenCache子節點監聽: /*子節點監聽*/ //子節點新增watcher //PathChildrenCache:監聽資料節點的增刪改,會觸發事件 String childNodePathCach

Curator之nodeCache一次註冊,N次

Curator之nodeCache一次註冊,N次監聽 /*Curator之nodeCache一次註冊,N次監聽*/ //為節點新增watcher //監聽資料節點的變更,會觸發事件 final NodeCache nodeCache =

ZooKeeper Watcher機制(資料變更的通知)(二)(分析)

緊接著上一篇部落格:https://blog.csdn.net/Dongguabai/article/details/82970852 在輸出內容中有這樣兩個結果: 在ZooKeeper中,介面類Watcher用於表示一個標準的事件處理器,其定義了事件通知相關的邏輯,包含Ke

JQuery 節點

into 修改 style doc 監聽 msu moved rom 其他 DOMSubtreeModified: 在DOM結構發生任何變化的時候。這個事件在其他事件觸發後都會觸發; 1 $(".attr_box").bind("DOMSubtreeModified"

Zookeeper機制api與原理

1.連線Zookeeper,註冊監聽 ZooKeeper zkCli = new ZooKeeper("192.168.50.183:2181,192.168.50.184:2181,192.168.50.185:2181", 3000, new Watcher() { //