1. 程式人生 > >使用java操作zookeeper(五)

使用java操作zookeeper(五)

之前使用的客戶端是3.4.6,後來換成了3.5.3-beta 版本,前面的知識中也有對3.5.x一些知識點的補充,往後的客戶端版本都使用該版本,下面的介紹也是基於3.5.3-beta的。

java操作zookeeper

pom新增依賴

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.3-beta</version>
</dependency>

主類:org.apache.zookeeper.ZooKeeper

這是ZooKeeper客戶端庫的主要類。要使用ZooKeeper服務,應用程式必須首先例項化ZooKeeper類的物件。所有操作zookeeper的操作都將通過呼叫ZooKeeper類的方法來完成。除非另有說明,否則此類的方法是執行緒安全的。 客戶端與伺服器建立連線後,會為客戶端分配一個會話ID。客戶端將定期向伺服器傳送心跳以保持會話有效。只要客戶端的會話ID保持有效,應用程式就可以通過客戶端呼叫ZooKeeper API。 如果由於某種原因,客戶端長時間無法向伺服器傳送心跳(例如,超過sessionTimeout值),則伺服器將使會話到期,並且會話ID將失效。客戶端物件將不再可用。要進行ZooKeeper API呼叫,應用程式必須建立一個新的客戶端物件。 如果客戶端當前連線的ZooKeeper伺服器出現故障或者沒有響應,則客戶端將在其sessionID到期之前自動嘗試連線到另一臺zookeeper伺服器。如果成功,應用程式可以繼續使用客戶端。 ZooKeeper API方法有同步或非同步兩種。同步方法會阻塞,直到伺服器響應。非同步方法只是將傳送請求排隊並立即返回。它們採用一個回撥物件,該回調物件將在成功執行請求時執行,或者在錯誤時執行,並返回指示錯誤的返回程式碼。 一些成功的ZooKeeper API呼叫可以將監視(Watcher)留在ZooKeeper伺服器中的“資料節點”上。其他成功的ZooKeeper API呼叫可以觸發這些Watcher。一旦Watcher被觸發,事件將被傳遞給客戶,假如Watcher觸發之後,立刻get節點資訊,客戶端在得到 Watch 訊息之前肯定不可能看到更新後的資料。換句話說,更新通知先於更新結果。Watch 只會被觸發一次。如果客戶端想得到後續更新的通知,必須要在 Watch 被觸發後重新註冊一個 Watch。 當客戶端丟失當前連線後重新連線伺服器,所有被認為是觸發的監視器,但是沒有送達的事件將丟失。為了模式這個場景,客戶端將產生一個特殊的事件,去告訴事件處理器連線被刪除。這個特殊的事件的型別是EventNone 狀態是KeeperStateDiscounnected。

ZooKeeper類有如下幾個構造器:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)
; ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig); ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig clientConfig); ZooKeeper(String connectString,
int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd); ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly); ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider); ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig clientConfig);

注意:每個構造器建立連線都是非同步的,構造方法啟動與伺服器的連線,然後立馬返回,此時會話處於CONNECTING狀態,通過watcher通知。此通知可以在構造方法呼叫返回之前或之後的任何時候到來。會話建立成功之後,狀態會改為CONNECTED。 構造器將丟擲兩個異常: java.io.IOException - 在網路故障的情況下 java.lang.IllegalArgumentException - ZooKeeper無效服務列表,或者 指定了無效的chroot路徑(下面介紹connectString引數時會介紹chroot)

引數介紹:
引數名 描述
connectString 要建立ZooKeeper客戶端物件,應用程式需要傳遞一個連線字串,其中包含逗號分隔的host:port列表,每個對應一個ZooKeeper伺服器。例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183例項化的ZooKeeper客戶端物件將從connectString中選擇一個任意伺服器並嘗試連線到它。如果建立連線失敗,將嘗試連線字串中的另一個伺服器(順序是非確定性的,因為是隨機),直到建立連線。客戶端將繼續嘗試,直到會話顯式關閉。在3.2.0版本之後,也可以在connectString後面新增字尾字串,如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/app/a,客戶端連線上ZooKeeper伺服器之後,所有對ZooKeeper的操作,都會基於這個根目錄。例如,客戶端對/foo/bar的操作,都會指向節點/app/a/foo/bar——這個目錄也叫Chroot,即客戶端隔離名稱空間。
sessionTimeout 會話超時(以毫秒為單位)客戶端和服務端連線建立成功之後,ZooKeeper中會建立一個會話,在一個會話週期內,ZooKeeper客戶端和服務端之間會通過心跳檢測機制來維持會話的有效性,一旦在sessionTimeout時間內沒有進行有效的心跳檢測,會話就會失效。
watcher 建立ZooKeeper客戶端物件時,ZooKeeper允許客戶端在構造方法中傳入一個介面Watcher(org.apache.zookeeper.Watcher)的實現類物件來作為預設的Watcher事件通知處理器。當然,該引數可以設定為null以表明不需要設定預設的Watcher處理器。如果設定為null,日誌中會有空指標異常,但是並不影響使用。
canBeReadOnly 3.4之後新增的boolean型別的引數,用於標識當前會話是否支援“read-only”模式。預設情況下,在ZooKeeper叢集中,一個機器如果和叢集中過半以上機器失去了網路連線,那麼這個機器將不再處理客戶端請求(包括讀寫請求)。但是在某些使用場景下,當ZooKeeper伺服器發生此類故障的時候,我們還是希望ZooKeeper伺服器能夠提供讀服務(當然寫服務肯定無法提供)——這就是ZooKeeper的“read-only”模式。
sessionId 和 sessionPasswd 會話id和 會話密碼,這兩個引數能夠唯一確定一個會話,同時客戶端使用這兩個引數實現客戶端會話複用,從而達到恢復會話的效果,使用方法:第一次連線上ZooKeeper伺服器後,客戶端使用getSessionId()和getSessionPasswd()獲取這兩個值,如果需要會話複用,在重新建立ZooKeeper客戶端物件的時候可以傳過去,如果不需要會話複用,請使用不需要這些引數的其他建構函式。
HostProvider 客戶端地址列表管理器

關於列表管理器,具體參考部落格: 從Paxos到Zookeeper 分散式一致性原理與實踐7.3.2章_伺服器地址列表 https://blog.csdn.net/en_joker/article/details/79310801 ZKClientConfig:3.5.2版本之後新增的引數,傳遞此conf物件為每個客戶端提供了與其他例項相比不同的配置屬性,更加的靈活性。

常用javaAPI呼叫

package com.zookeeper.test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;

/**
 * @author: WangSaiChao
 * @date: 2018/9/10
 * @description: 
 */
public class TestZookeeper implements Watcher{

  public static void main(String[] args) throws Exception {

    //===============================建立會話======================================

    /**
     * 使用第一個構造器建立會話,剛建立完立刻列印會話狀態為 CONNECTING
     * 執行緒阻塞5秒,這5秒期間收到了服務端的Watcher通知 SyncConnected
     * 之後會話狀態為 CONNECTED
     */
    ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181,192.168.1.198:2181,172.12.96.123:2181", 5000, new TestZookeeper());
    System.out.println(zookeeper1.getState());
    Thread.sleep(2000);
    System.out.println(zookeeper1.getState());

    /**
     * 使用第三個構造器,sessionId 和 sessionPasswd用的是上一個連線
     */
    long sessionId = zookeeper1.getSessionId();
    byte[] sessionPasswd = zookeeper1.getSessionPasswd();
    ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, new TestZookeeper(),sessionId,sessionPasswd);
    System.out.println(zookeeper2.getState());
    Thread.sleep(2000);
    System.out.println(zookeeper2.getState());


    //===============================建立節點======================================
    /**
     * CreateMode
     *    PERSISTENT : 持久節點
     *    PERSISTENT_SEQUENTIAL : 持久順序節點
     *    EPHEMERAL : 臨時節點
     *    EPHEMERAL_SEQUENTIAL : 臨時順序節點
     *
     * 無論是同步還是非同步介面,ZooKeeper都不支援遞迴建立,即無法在父節點不存在的情況下建立一個子節點。
     * 另外,如果一個節點已經存在了,那麼建立同名節點的時候,會丟擲NodeExistException異常。如果是順序節點,那麼永遠不會丟擲NodeExistException異常
     * 臨時節點不能有子節點,建立節點時,如果給定的父節點是臨時節點,則會丟擲NoChildrenForEphemeralsException
     * 建立節點同時,也可以在節點上設定Watcher 當刪除節點或者setData時,將會觸發Watcher
     */


    /**
     * 同步建立一個持久節點,ACL為 world:anyone:cdrwa 等同於如下命令:
     * create /node 123 world:anyone:cdrwa
     */
    zookeeper1.create("/node",
        "123".getBytes(),
        ZooDefs.Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT);

    /**
     * 同步建立一個持久節點,ACL為 world:anyone:cdrwa 所有人只擁有建立的許可權,等同於如下命令:
     * create /node1 123 world:anyone:c
     */
    zookeeper1.create("/node1",
        "123".getBytes(),
        Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, ZooDefs.Ids.ANYONE_ID_UNSAFE)),
        CreateMode.PERSISTENT);

    /**
     * 非同步建立一個 臨時的順序節點,ACL為 ip:127.0.0.1:c 等同於如下命令:
     * create /node2 123 ip:127.0.0.1:c
     */
    zookeeper1.create("/node2",
        "123".getBytes(),
        Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))),
        CreateMode.EPHEMERAL_SEQUENTIAL,
        new AsyncCallback.StringCallback() {
          @Override
          public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("rc:" + rc);
            System.out.println("path:" + path);
            System.out.println("ctx:" + ctx);
            System.out.println("name:" + name);
          }
        }, "傳給服務端的內容,會在非同步回撥時傳回來");
    /**
     * 注意這裡,執行緒睡眠20秒,因為是建立的臨時節點,如果不睡眠,你不能使用命令在控制檯看見建立的臨時節點
     */
    Thread.sleep(20000);


    /**
     * 非同步建立一個持久節點, ACL為 digest:wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw=:cdrwa,等同於如下命令:
     * create /node3 123 digest:wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw=:cdrwa
     */
    zookeeper1.create("/node3",
        "123".getBytes(),
        Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))),
        CreateMode.PERSISTENT,
        new AsyncCallback.StringCallback() {
          @Override
          public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("rc:" + rc);
            System.out.println("path:" + path);
            System.out.println("ctx:" + ctx);
            System.out.println("name:" + name);
          }
        }, "傳給服務端的內容,會在非同步回撥時傳回來");
    /**
     * 注意這裡,執行緒睡眠20秒,可以接收到watcher
     */
    Thread.sleep(20000);

    /**
     * 建立一個持久順序定時節點,如果在10000毫秒內 未修改node,並且沒有子節點,那麼它將被刪掉
     */
    zookeeper1.create("/node4",
        "123".getBytes(),
        Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))),
        CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL,
        new AsyncCallback.Create2Callback() {
          @Override
          public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
            System.out.println("rc:" + rc);
            System.out.println("path:" + path);
            System.out.println("ctx:" + ctx);
            System.out.println("name:" + name);
            System.out.println("stat:" + stat);
          }
        }, "傳給服務端的內容,會在非同步回撥時傳回來", 10000);
    Thread.sleep(20000);


    //===============================獲取節點資料======================================

    /**
     * 每一步操作/node3都要先執行該語句
     * 因為上一步建立的node3 添加了 digest ACL 所以在獲取該節點資訊的時候,要先新增授權
     */
    zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());

    /**
     * 同步呼叫
     * path 節點路徑
     * Watcher 監視
     * Stat 節點統計資訊
     * 新增授權之後同步獲取節點資訊,返回給定路徑的節點的資料和統計資訊
     * 如果呼叫成功,並且watcher引數不為空,則會在具有給定路徑的節點上保留監視,當刪除節點 或者 setData時候將會觸發監視
     *
     */
    byte[] data = zookeeper1.getData("/node3", new TestZookeeper(), new Stat());
    System.out.println(new String(data));
    /**
     * 注意這裡,執行緒睡眠2秒,因為是建立的臨時節點,如果不睡眠,你不能使用命令在控制檯看見建立的臨時節點
     */
    Thread.sleep(2000);

    /**
     * 非同步呼叫
     * path 節點路徑
     * watch true使用建立zookeeper時指定的預設watcher 如果為false則不設定監聽
     * DataCallback 非同步通知
     * ctx 回撥上下文
     */
    zookeeper1.getData("/node3", false, new AsyncCallback.DataCallback() {
      @Override
      public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("rc:" + rc);
        System.out.println("path:" + path);
        System.out.println("ctx:" + ctx);
        System.out.println("data:" + new String(data));
        System.out.println("stat:" + stat);
      }
    },"傳給服務端的內容,會在非同步回撥時傳回來");
    Thread.sleep(2000);

    //===============================修改節點資料======================================

    /**
     * setData節點有version這一引數,給定版本與節點的版本匹配,則設定給定路徑的節點的資料(如果給定版本是-1,則它匹配任何節點的版本)。返回節點的統計資訊。
     * 如果不存在具有給定路徑的節點,則將丟擲NoNodeException
     * 如果給定版本與節點的版本不匹配,將丟擲BadVersionException
     * 設定的資料最大允許大小為1 MB
     */

    /**
     * 每一步操作/node3都要先執行該語句
     * 因為上一步建立的node3 添加了 digest ACL 所以在獲取該節點資訊的時候,要先新增授權
     */
    zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());

    /**
     * 同步設定資料 -1匹配任何版本
     */
    Stat stat = zookeeper1.setData("/node3", "嗨嘍".getBytes(), -1);
    System.out.println(stat);

    /**
     * 非同步設定資料
     */
    zookeeper1.setData("/node3", "helloword".getBytes(), -1, new AsyncCallback.StatCallback() {
      @Override
      public void processResult(int rc, String path, Object ctx, Stat stat) {

        System.out.println("rc:" + rc);
        System.out.println("path:" + path);
        System.out.println("ctx:" + ctx);
        System.out.println("stat:" + stat);

      }
    },"傳給服務端的內容,會在非同步回撥時傳回來");
    Thread.sleep(2000);

    //===============================刪除節點======================================

    /**
     * 刪除給定路徑的節點。如果存在這樣的節點,則呼叫將成功,並且給定版本與節點的版本匹配(如果給定版本為-1,則它匹配任何節點的版本)。
     * 如果節點不存在,將丟擲NoNodeException。
     * 如果給定版本與節點的版本不匹配,將丟擲BadVersionException。
     * 如果節點有子節點,將丟擲NotEmptyException。
     * 如果成功將觸發現有API呼叫留下的給定路徑節點上的所有監視,以及getChildren API呼叫留下的父節點上的監視。
     */



    /**
     * 同步刪除節點
     * path 節點路徑
     * version 版本號 -1 代表匹配所有版本
     */
    zookeeper1.delete("/node1",-1);

    /**
     * 非同步刪除節點
     */
    zookeeper1.delete("/node2", -1, new AsyncCallback.VoidCallback() {
      @Override
      public void processResult(int rc, String path, Object ctx) {
        System.out.println("rc:" + rc);
        System.out.println("path:" + path);
        System.out.println("ctx:" + ctx);
      }
    },"傳給服務端的內容,會在非同步回撥時傳回來");
    Thread.sleep(2000);

    //===============================判斷節點是否存在======================================

    /**
     * 返回給定路徑的節點的stat。如果不存在這樣的節點,則返回null。
     * 如果wathher非空並且呼叫成功(不會丟擲異常),則會在具有給定路徑的節點上保留監視。wather將由建立/刪除節點或在節點上設定資料的成功時觸發。
     */

    /**
     * 同步檢查節點是否存在,並留下監聽
     */
    Stat exists = zookeeper1.exists("/node2", new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        System.out.println("留下監視");
      }
    });
    System.out.println("判斷節點是否存在:"+exists);



    /**
     * 非同步檢查節點是否存在,並留下監聽
     */
    zookeeper1.exists("/node2", new Watcher() {
      @Override
      public void process(WatchedEvent event) {
        System.out.println("留下監視");
      }
    }, new AsyncCallback.StatCallback() {
      @Override
      public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc:" + rc);
        System.out.println("path:" + path);
        System.out.println("ctx:" + ctx);
        System.out.println("判斷節點是否存在:" + stat);
      }
    },"傳給服務端的內容,會在非同步回撥時傳回來");


    //===============================ACL操作======================================

    /**
     * ACL操作只給 同步  ,非同步 自己看文件
     */


    /**
     * 每一步操作/node3都要先執行該語句
     * 因為上一步建立的node3 添加了 digest ACL 所以在獲取該節點資訊的時候,要先新增授權
     */



    /**
     * 設定ACL
     */
    //先註冊一個 helloworld 密碼為123456的使用者
    zookeeper1.addAuthInfo("digest","helloworld:123456".getBytes());
    //因為是 /node3 節點 所以還需要 先新增 /node3的授權
    zookeeper1.addAuthInfo("digest","wangsaichao:123456".getBytes());
    Stat auth = zookeeper1.setACL("/node3", Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("auth", "helloworld:123456"))), -1);
    System.out.println(auth);


    /**
     * 獲取ACL
     */
    List<ACL> acl = zookeeper1.getACL("/node3", new Stat());
    System.out.println(acl);


  }
  
  @Override
  public void process(WatchedEvent event) {
    System.out.println(