1. 程式人生 > 其它 >zookeeper學習系列:二、api實踐

zookeeper學習系列:二、api實踐

上一章我們知道zookeeper的簡介,啟動,設定節點以及結構效能。本小節我們來玩玩api,獲取下資料。

php版本: http://anykoro.sinaapp.com/2013/04/05/%E4%BD%BF%E7%94%A8apache-zookeeper%E5%88%86%E5%B8%83%E5%BC%8F%E9%83%A8%E7%BD%B2php%E5%BA%94%E7%94%A8%E7%A8%8B%E5%BA%8F/

go版本:http://mmcgrana.github.io/2014/05/getting-started-with-zookeeper-and-go.html

讀一下:http://zookeeper.apache.org/doc/trunk/javaExample.html  

然後我說 what the fuck it is?

我就想讀個數據,需要這麼複雜麼。。。

動手改一下

版本1:  只獲取資料,不管別的:

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class ZkReader {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hostPort = "192.168.1.2,192.168.1.3,192.168.1.4";
        String znode = "/test";
        ZooKeeper zk = new ZooKeeper(hostPort, 3000, null);
        System.out.println(new String(zk.getData(znode,false,null)));
    }
}

在zkcli上建立 /test 並改變它的值:123,執行,輸出:

123

能得到結果,但是報錯了:

14/10/17 11:51:58 ERROR zookeeper.ClientCnxn: Error while calling watcher 
java.lang.NullPointerException
    at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:521)
    at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:497)

看下原始碼,需要註冊個watcher,意思是不這樣zookeeper就只是個純配置了?ok

版本2:zk get data+watcher

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;

public class ZkReader {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        ZooKeeper zk = new ZooKeeper(hostPort, 3000, new MyWatcher());
        System.out.println(new String(zk.getData(znode,false,null)));
    }
}

class MyWatcher  implements Watcher {

    @Override
    public void process(WatchedEvent event) {
        System.out.println("hello zookeeper");
        System.out.println(String.format("hello event! type=%s, stat=%s, path=%s",event.getType(),event.getState(),event.getPath()));
    }
}

輸出卻是:

hello zookeeper 123 hello event! type=None, stat=SyncConnected, path=null

data總是在中間?百撕不得姐,在郵件組裡諮詢下,幾天後有了回覆(不夠活躍的郵件組了):

Zookeeper works asynchronously in several threads. Therefore the sequence of execution in different threads is not generally predictable. It could therefore happen that when the connection status change is detected, the Watcher is executed, but only the first "hello zookeeper" gets echoed, then the main thread gets some cycles again and prints "123", after which the second print statement "hello event!..." is executed. If you don't want this to happen, use a CountDownLatch to make the main thread wait until the Zookeeper connection is established and propertly recognized in your program. The main thread creates the CountDownLatch(1), opens the Zk connection and waits latch.await(). The Watcher does its job and then counts the latch down by one, causing the main thread to leave the await and continue doing its job.

被認為是多執行緒問題,建立zk連線時會啟動多個執行緒:sendThread  eventThread

eventThread執行到一半時,主執行緒獲得了cpu,打印出結果,然後eventThread繼續執行watcher.process。

這兩個版本只是做到了獲取資料,如果資料有變動,需要自動更新呢?ok,參照zk給的例子,簡化出第三個版本:

 DataMonitor.java :

/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Watcher, StatCallback {

    ZooKeeper zk;

    String znode;

    boolean dead;

    DataMonitorListener listener;

    byte prevData[];

    public DataMonitor(ZooKeeper zk, String znode,  DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.listener = listener;
        // Get things started by checking if the node exists. We are going
        // to be completely event driven
        zk.exists(znode, true, this, null);
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void showData(byte data[]);
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc : "+rc);

        byte b[] = null;
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.showData(b);
            prevData = b;
        }
    }
}

Executor.java:

import java.io.IOException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
    DataMonitor dm;
    ZooKeeper zk;

    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        zk = new ZooKeeper(hostPort, 3000, this);
        dm = new DataMonitor(zk, znode, this);
    }

    public static void main(String[] args) {
        String hostPort = "192.168.1.22,192.168.1.12,192.168.1.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#
     */
    public void process(WatchedEvent event) {
        System.out.println("Executor process event: "+event.getType());
        dm.process(event);

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void showData(byte[] data) {
            System.out.println("data changes: "+new String(data));
    }
}

一個執行者一個監控,註冊watcher到zk,當有事件發生時,推送本身的StatCallback到Zookeeper,當節點有變動時呼叫processResult展示結果。

Executor process event: NodeDataChanged watch event type: NodeDataChanged rc : 0 data changes: abcd

還是有點複雜,仔細看下DataMonitor似乎沒有存在的必要,我只需要一個類,啟動zk client,並監聽資料變化就好了,於是有了第四個單物件版本:

Executor.java
import java.io.IOException;
import java.util.Arrays;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

public class Executor
        implements Watcher, Runnable, AsyncCallback.StatCallback
{
    ZooKeeper zk;
    String znode;
    byte prevData[];

    public Executor(String hostPort, String znode) throws KeeperException, IOException {
        zk = new ZooKeeper(hostPort, 3000, this);
        this.znode = znode;
        zk.exists(znode, true, this, null);
    }

    public static void main(String[] args) {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#
     */
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc : "+rc);

        byte b[] = null;
        try {
            b = zk.getData(znode, false, null);
        } catch (KeeperException e) {
            // We don't need to worry about recovering now. The watch
            // callbacks will kick off any exception handling
            e.printStackTrace();
        } catch (InterruptedException e) {
            return;
        }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            System.out.println("data changes: "+new String(b));
            prevData = b;
        }
    }
}

自己做watcher,並註冊回撥函式給zk,更簡潔。

經測試,zk三臺停掉一臺,剩一主一從,仍能正常服務,剩一臺時則報錯,無法連線,重啟動zk變成兩臺,客戶端也無法恢復,重啟了才恢復。

看了php api,理解了一下zk.exists 做的操作,exists和get方法都會註冊回撥過去,一個是註冊watcher,一個是註冊StatCallback,當觸發事件時,監視器會被消費掉,所以我們需要在回撥函式中再次設定監視器。於是有了第五個版本

import java.io.IOException;

import org.apache.zookeeper.*;

public class Executor
        implements Watcher, Runnable
{
    ZooKeeper zk;
    String znode;

    public Executor(String hostPort, String znode) throws KeeperException, IOException, InterruptedException {
        zk = new ZooKeeper(hostPort, 30000, this);
        this.znode = znode;
        zk.getData(znode, this, null);
    }

    public static void main(String[] args) {
        String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13";
        String znode = "/test";
        try {
            new Executor(hostPort, znode).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() != Event.EventType.None) {
            System.out.println("watch event type: "+event.getType());
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                try {
                    System.out.println(new String(zk.getData(znode, this, null)));
                } catch (KeeperException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                } catch (InterruptedException e) {
                    e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
                }
            }
        }

    }

    public void run() {
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }
}

上邊這兩個版本已經可以檢測到zk的資料節點變動,但沒有處理異常情況,沒有處理close事件,大家可以自己動手改造下難懂的http://zookeeper.apache.org/doc/trunk/javaExample.html  例子。 

 更多java api操作(建立節點、刪除修改等):http://www.cnblogs.com/haippy/archive/2012/07/19/2600032.html