1. 程式人生 > >Zookeeper的java實例

Zookeeper的java實例

zookeeper

還是在之前的模塊中寫這個例子:

技術分享

註意在pom.xml中加上Zookeeper的依賴,

技術分享

現在開始寫ZookeeperDemo.java

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;


public class ZookeeperDemo implements Watcher{

    Logger logger = Logger.getLogger(ZookeeperDemo.class);

    protected CountDownLatch countDownLatch = new CountDownLatch(1);

    //緩存時間
    private static final int SESSION_TIME = 2000;

    public static ZooKeeper zooKeeper = null;

    /**
     * 監控所有被觸發的事件
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        logger.info("收到事件通知:" + watchedEvent.getState());
        if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
            countDownLatch.countDown();
        }
    }

    public void connect(String hosts){
        try{
            if(zooKeeper == null){
                //zk客戶端允許我們將ZK服務的所有地址進行配置
                zooKeeper = new ZooKeeper(hosts,SESSION_TIME,this);
                //使用countDownLatch的await
                countDownLatch.await();
            }

        }catch(IOException e){
            logger.error("連接創建失敗,發生 IOException :" + e.getMessage());
        } catch (InterruptedException e) {
            logger.error("連接創建失敗,發生 InterruptedException :" + e.getMessage());
        }
    }

    /**
     * 關閉連接
     */
    public void close(){
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        }catch (InterruptedException e){
            logger.error("釋放連接錯誤 :"+ e.getMessage());
        }
    }
}

我們詳細解釋一下為什麽要有這個類:

這個類是實現了Watcher接口:Watcher機制:目的是為ZK客戶端操作提供一種類似於異步獲取數據的操作。采用Watcher方式來完成對節點狀態的監視,通過對/hotsname節點的子節點變化事件的監聽來完成這一目標。監聽進程是作為一個獨立的服務或者進程運行的,它覆蓋了 process 方法來實現應急措施。

這裏面涉及到的類:CountDownLatch:CountDownLatch是一個同步的工具類,允許一個或多個線程一直等待,直到其他線程的操作執行完成後再執行。在Java並發中,countdownLatch是一個常見的概念。CountDownLatch是在java1.5被引入的,跟它一起被引入的並發工具類還有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它們都存在於java.util.concurrent包下。

CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作後再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之後再執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。

下面是本例裏面用到的CountDownLatch的構造方法和其註釋:

/**
 * Constructs a {@code CountDownLatch} initialized with the given count.
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

這段釋義應該是說:以給定的count值來初始化一個countDownLatch對象。其中count是指等待的線程在count個線程執行完成後再執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然後在閉鎖上等待的線程就可以恢復執行任務。

那麽count(計數值)實際上就是閉鎖需要等待的線程數量。

與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程後立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。

其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。所以當N個線程都調 用了這個方法,count的值等於0,然後主線程就能通過await()方法,恢復執行自己的任務。

更多CountDownLatch類可以參考:http://www.importnew.com/15731.html


Zookeeper類Zookeeper 中文API 】:http://www.cnblogs.com/ggjucheng/p/3370359.html

下面這是本例用到的Zookeeper的構造方法:第一個是主機地址,第二個是會話超時時間、第三個是監視者。

ZooKeeper(String connectStringsessionTimeoutWatcher watcher) IOException {
    (connectStringsessionTimeoutwatcher)}


再來看第二個類:ZookeeperOperation.java。確切的說,這個名字改錯了,應該叫ZNodeOperation。因為這就是對節點進行操作的一個類:

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.data.Stat;

import java.util.List;

public class ZookeeperOperation {
    Logger logger = Logger.getLogger(ZookeeperOperation.class);
    ZookeeperDemo zookeeperDemo = new ZookeeperDemo();

    /**
     * 創建節點
     * @param path 節點路徑
     * @param data  節點內容
     * @return
     */
    public boolean createZNode(String path,String data){
        try {
            String zkPath = ZookeeperDemo.zooKeeper.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            logger.info("Zookeeper創建節點成功,節點地址:" + zkPath);
            return true;
        } catch (KeeperException e) {

            logger.error("創建節點失敗:" + e.getMessage() + ",path:" + path  ,e);
        } catch (InterruptedException e) {
            logger.error("創建節點失敗:" + e.getMessage() + ",path:" + path  ,e);
        }
        return false;

    }

    /**
     * 刪除一個節點
     * @param path 節點路徑
     * @return
     */
    public boolean deteleZKNode(String path){
        try{
            ZookeeperDemo.zooKeeper.delete(path,-1);
            logger.info("Zookeeper刪除節點1成功,節點地址:" + path);
            return  true;
        }catch (InterruptedException e){
            logger.error("刪除節點失敗:" + e.getMessage() + ",path:" + path,e);
        }catch (KeeperException e){
            logger.error("刪除節點失敗:" + e.getMessage() + ",path:" + path,e);
        }
        return false;
    }

    /**
     * 更新節點內容
     * @param path 節點路徑
     * @param data 節點數據
     * @return
     */
    public boolean updateZKNodeData(String path,String data){
        try {
            Stat stat = ZookeeperDemo.zooKeeper.setData(path,data.getBytes(),-1);
            logger.info("更新節點數據成功,path:" + path+", stat:" + stat);
            return  true;
        } catch (KeeperException e) {
            logger.error("更新節點數據失敗:" + e.getMessage() + ",path:" + path ,e);
        } catch (InterruptedException e) {
            logger.error("更新節點數據失敗:" + e.getMessage() + ",path:" + path ,e);
        }
        return false;
    }

    /**
     * 讀取指定節點的內容
     * @param path 指定的路徑
     * @return
     */
    public String readData(String path){
        String data=null;
        try {
            data = new String(ZookeeperDemo.zooKeeper.getData(path,false,null));
            logger.info("讀取數據成功,其中path:" + path+ ", data-content:" + data);
        } catch (KeeperException e) {
            logger.error( "讀取數據失敗,發生KeeperException! path: " + path + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取數據失敗,InterruptedException! path: " + path + ", errMsg:" + e.getMessage(), e );
        }
      return data;
    }

    /**
     * 獲取某個節點下的所有節點
     * @param path 節點路徑
     * @return
     */
    public List<String> getChild(String path){
        try {
            List<String> list = ZookeeperDemo.zooKeeper.getChildren(path,false);
            if(list.isEmpty()){
                logger.info(path + "的路徑下沒有節點");
            }
            return list;
        } catch (KeeperException e) {
            logger.error( "讀取子節點數據失敗,發生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取子節點數據失敗,InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return null;
    }

    public boolean isExists(String path){
        try {
            Stat stat = ZookeeperDemo.zooKeeper.exists(path,false);
            return null != stat;
        } catch (KeeperException e) {
            logger.error( "讀取數據失敗,發生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            logger.error( "讀取數據失敗,發生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return  false;
    }
}

其中,有個creatZNode()方法中,有用到:

String zkPath = ZookeeperDemo.zooKeeper.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

這個方法是這樣的:String create(String path, byte[] data, List<ACL> acl,CreateMode createMode);

創建一個給定的目錄節點 path, 並給它設置數據,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點存儲的數據不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功創建的目錄節點名;EPHEMERAL:臨時目錄節點,一旦創建這個節點的客戶端與服務器端口也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點。

更多關於Zookeeper的中文API,參考這個博客:

http://www.cnblogs.com/ggjucheng/p/3370359.html


第三個類,ZookeeperCliTest.java就是測試啦:

import java.util.List;

public class ZookeeperCliTest {
    public static void main(String[] args){
        //定義父子類節點路徑
        String rootPath = "/ZookeeperRoot01";
        String childPath1 = rootPath+ "/child101";
        String childPath2 = rootPath+ "/child201";

        //ZookeeperOperation操作API
        ZookeeperOperation zookeeperOperation = new ZookeeperOperation();

        //連接Zookeeper服務器
        ZookeeperDemo zookeeperDemo =new ZookeeperDemo();
        zookeeperDemo.connect("127.0.0.1:2181");

        //創建節點
        if(zookeeperOperation.createZNode(rootPath,"<父>父節點數據")){
            System.out.println("節點 [ " +rootPath + " ],數據 [ " + zookeeperOperation.readData(rootPath)+" ]");
        }

        // 創建子節點, 讀取 + 刪除
        if ( zookeeperOperation.createZNode( childPath1, "<父-子(1)>節點數據" ) ) {
            System.out.println( "節點[" + childPath1 + "]數據內容[" + zookeeperOperation.readData( childPath1 ) + "]" );
            zookeeperOperation.deteleZKNode(childPath1);
            System.out.println( "節點[" + childPath1 + "]刪除值後[" + zookeeperOperation.readData( childPath1 ) + "]" );
        }

        // 創建子節點, 讀取 + 修改
        if ( zookeeperOperation.createZNode(childPath2, "<父-子(2)>節點數據" ) ) {
            System.out.println( "節點[" + childPath2 + "]數據內容[" + zookeeperOperation.readData( childPath2 ) + "]" );
            zookeeperOperation.updateZKNodeData(childPath2, "<父-子(2)>節點數據,更新後的數據" );
            System.out.println( "節點[" + childPath2+ "]數據內容更新後[" + zookeeperOperation.readData( childPath2 ) + "]" );
        }

        // 獲取子節點
        List<String> childPaths = zookeeperOperation.getChild(rootPath);
        if(null != childPaths){
            System.out.println( "節點[" + rootPath + "]下的子節點數[" + childPaths.size() + "]" );
            for(String childPath : childPaths){
                System.out.println(" |--節點名[" +  childPath +  "]");
            }
        }
        // 判斷節點是否存在
        System.out.println( "檢測節點[" + rootPath + "]是否存在:" + zookeeperOperation.isExists(rootPath)  );
        System.out.println( "檢測節點[" + childPath1 + "]是否存在:" + zookeeperOperation.isExists(childPath1)  );
        System.out.println( "檢測節點[" + childPath2 + "]是否存在:" + zookeeperOperation.isExists(childPath2)  );


        zookeeperDemo.close();
    }

}

由於開啟了Logger的打印日誌到控制臺,所以運行結果如下:

技術分享

以上是main啟動的打印信息。

技術分享


技術分享

以上的漢字就是我們在代碼中要求Logger打印到控制臺的。

若不想看到這麽多信息,可以註釋掉log4j.properties.結果如下:除了System.out.println()的打印結果,Logger一個都沒有出現。

技術分享


log4j.properties的內容如下:

#logger
log4j.rootLogger=debug,appender1
log4j.appender.appender1=org.apache.log4j.ConsoleAppender
log4j.appender.appender1.layout=org.apache.log4j.TTCCLayout


Zookeeper的java實例