ZooKeeper Java Example(官方例子)
為了向您介紹ZooKeeper Java API,我們在這裏開發了一個非常簡單的觀看客戶端。該ZooKeeper客戶端通過啟動或停止程序來觀察ZooKeeper節點的更改並進行響應。
要求
有四個要求:
1.它作為參數:
ZooKeeper服務的地址
那麽znode的名字就是被觀看的
具有參數的可執行文件
2.它獲取與znode相關聯的數據,並啟動可執行文件。
3.如果znode更改,客戶端將重新啟動內容並重新啟動可執行文件。
4.如果znode消失,客戶端將殺死可執行文件。
程序設計
通常,ZooKeeper應用程序分為兩個單元,一個維護連接,另一個用於監視數據。
此外,Executor包含主線程並包含執行邏輯。它負責什麽樣的用戶交互,以及與您作為參數傳遞的可執行程序的交互以及根據znode的狀態關閉和重新啟動示例。
1. Executor.java
package com.hellojd.cloud; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; /** * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */ public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { System.out.println("Watcher process"); dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } //以響應ZooKeeper連接永久消失。 public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } //保存znode數據至文件 try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }
2. DataMonitor.java
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ package com.hellojd.cloud; import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; /** * 另一方面,DataMonitorListener接口不是ZooKeeper API的一部分。 它是一個完全定制的界面,專為此示例應用程序而設計。 * DataMonitor對象使用它來回傳給它的容器,它也是Executor對象。 */ public class DataMonitor implements StatCallback { //Executor或一些類似Executor的對象“擁有”ZooKeeper連接,但可以將事件委托給其他事件到其他對象。 ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; //簡單地將這些事件轉發到DataMonitor來決定如何處理它們 DataMonitorListener listener; byte prevData[]; // 它主要是異步和事件驅動 public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * 該接口在DataMonitor類中定義,並在Executor類中實現。 當調用Executor.exists()時,執行器根據要求決定是啟動還是關閉。 * 當znode不再存在時,需要說的是殺死可執行文件。 */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } //響應ZooKeeper狀態的更改 public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Watcher.Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don‘t need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It‘s all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let‘s find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { /** * 首先檢查znode存在,致命錯誤和可恢復錯誤的錯誤代碼。 * 如果文件(或znode)存在,它將從znode獲取數據,然後調用Executor的exists()回調, * 如果狀態已更改。 註意,它不必對getData調用執行異常處理,因為它具有掛起的任何可能導致錯誤的監視器: * 如果節點在調用ZooKeeper.getData()之前被刪除,則由ZooKeeper設置的監視事件 .exists()觸發回調; *如果發生通信錯誤,連接回顯將觸發連接監視事件。 */ boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } //文件(或znode)存在 byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don‘t need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }
調試:
參數列表:192.168.0.10:2181 /hellojd_node filename calc
192.168.0.10:2181:ZK地址
/hellojd_node :監視node
filename :備份數據文件
calc:命令
來自:http://zookeeper.apache.org/doc/current/javaExample.html
本文出自 “簡單” 博客,請務必保留此出處http://dba10g.blog.51cto.com/764602/1975090
ZooKeeper Java Example(官方例子)