Zookeeper實現服務上下線監控服務列表
阿新 • • 發佈:2019-02-05
package com.billstudy.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 註冊服務,並且自動監聽可用服務列表 * @author Bill * @since V1.0 2015年6月24日 - 上午10:03:24 */ public class AppServer { private ZooKeeper zk = null; // 樹字首 private static final String zkParentPrefix = "/appserver"; private static final String zkChildPrefix = "/app"; // 維護可用列表 private final ArrayList<String> availableServerList = new ArrayList<String>(); public void connectZk(String address){ try { zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.toString()); if (event.getType() == EventType.NodeChildrenChanged && event.getPath().startsWith(zkParentPrefix) ) { flushServerList(); } } }); // 如果根節點沒有,則先建立 if (zk.exists(zkParentPrefix, true) == null) { zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !"); } // 根據當前address建立臨時連續子節點,這樣多個不同的app child節點不會重複。 zk會自己維護序列 String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("create " + childPath + " successful, address is :" + address); flushServerList(); } catch (Exception e) { e.printStackTrace(); } } /** * 收到子節點更新後,重新整理當前可用服務列表 * @author Bill * @since V1.0 2015年6月24日 - 上午10:08:19 */ protected void flushServerList() { availableServerList.clear(); try { List<String> children = zk.getChildren(zkParentPrefix, true); for (String child : children) { byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat()); availableServerList.add(new String(data,"UTF-8")); } System.out.println("current available server list:" + availableServerList); } catch (Exception e) { e.printStackTrace(); } } /** * 此處可以用來處理業務邏輯,目前讓主執行緒掛起 * @author Bill * @since V1.0 2015年6月24日 - 上午10:24:00 */ public void handle(){ try { // System.out.println("handle ..."); TimeUnit.HOURS.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { if (args.length != 1) { System.err.println("The program first argument must be address !"); System.exit(1); } AppServer appServer = new AppServer(); appServer.connectZk(args[0]); appServer.handle(); } }