1. 程式人生 > >ZooKeeper API使用 I 建立節點

ZooKeeper API使用 I 建立節點

客戶端可以通過 ZooKeeper 的 API 來建立一個數據節點,有如下兩個介面:

String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
void create(final String path, byte date[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

這兩個介面分別以同步非同步方式建立節點,API 方法的引數說明如表:

引數名 說明
path 需要建立的資料節點的節點路徑。比如,/zk-book/foo。
data[] 一個位元組陣列,是節點建立後的初始內容。
acl 節點的 ACL 策略。
createMode 節點型別,是一個列舉型別,通常有4種可選的節點型別:持久(PERSISTENT)持久順序(PERSISTENT_SEQUENTIAL)臨時(EPHEMERAL)臨時順序(EPHEMERAL_SEQUENTIAL)。關於ZNode的節點特性,將在後面做詳解介紹。
cb 註冊一個非同步回撥函式。開發人員需要實現StringCallBack介面
,主要是對下面這個方法的重寫:void processResult(int rc, String path, Object ctx, String name);當服務端節點建立完畢後,ZooKeeper 客戶端就會自動呼叫這個方法,這樣就可以處理相關的業務邏輯了。
ctx 用於傳遞一個物件,可以在毀掉方法執行的時候使用,通常是一個上下文(Context)資訊。

需要注意幾點,無論是同步還是非同步介面,ZooKeeper 都不支援遞迴建立,即無法再父節點不存在的情況下建立一個子節點。另外,如果一個節點已經存在了,那麼建立同名節點的時候,會丟擲NodeExistException異常

目前,ZooKeeper 的節點內容只支援位元組陣列(byte[])型別。也就是說,ZooKeeper 不負責為節點內容進行序列化,開發人員需要自己使用序列化工具將節點內容進行序列化和反序列化。對於字串,可以簡單地使用“String”.getBytes()來生成一個位元組陣列;對於其他複雜物件,可以使用 Hessian 或是 Kryo 等專門的序列化工具來進行序列化。

關於許可權控制,如果你的應用場景沒有太高的許可權要求,那麼可以不關注這個引數,只需要在 acl 引數中傳入引數Ids.OPEN_ACL_UNSAFE,這就表明之後對這個節點的任何操作都不受許可權控制。關於 ZooKeeper 的許可權控制,將在後面做纖細介紹。

使用同步API建立一個節點

package com.lpf.zookeeper.ZooKeeper.create;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

// ZooKeeper API建立節點,使用同步(sync)介面
public class ZooKeeper_Create_API_Sync_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) {
        try {
            ZooKeeper zookeeper = new ZooKeeper("localhost:2181", 5000, new ZooKeeper_Create_API_Sync_Usage());
            connectedSemaphore.await();

            String path1 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("Success create znode:" + path1);

            String path2 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("Success create znode:" + path2);

            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

執行程式,輸出結果如下:

Success create znode:/zk-test-ephemeral-
Success create znode:/zk-test-ephemeral-0000000003

在上面這個程式片段中,使用了同步的節點建立介面:String create(final String path,byte data[],List<ACL> acl,CreateMode createMode)。在介面使用中,我們分別建立了兩種型別的節點:臨時節點和臨時順序節點。從返回的結果可以看出,如果建立了臨時節點,那麼API的返回值就是當時傳入的path引數;如果建立了臨時順序節點,那麼ZooKeeper 會自動在節點後加上一個數字,並且在 API 介面的返回值中返回該資料節點的一個完整的節點路徑。

使用非同步API建立一個節點

package com.lpf.zookeeper.ZooKeeper.create;

import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

import java.util.concurrent.CountDownLatch;

// ZooKeeper API建立節點,使用非同步(async)介面
public class ZooKeeper_Create_API_ASync_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) {
        try {
            ZooKeeper zookeeper = new ZooKeeper("localhost:2181", 5000, new ZooKeeper_Create_API_ASync_Usage());
            connectedSemaphore.await();

            zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                    new IStringCallback(), "I am contest.");

            zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL, new IStringCallback(), "I am context.");

            zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(), "I am context.");

            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

class IStringCallback implements AsyncCallback.StringCallback {

    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("Create path result:【" + rc + "," + path + "," + ","
                + ctx + ", real path name:" + name + "】");
    }
}

執行程式,輸出結果如下:

Create path result:【0,/zk-test-ephemeral-,,I am contest., real path name:/zk-test-ephemeral-】
Create path result:【-110,/zk-test-ephemeral-,,I am context., real path name:null】
Create path result:【0,/zk-test-ephemeral-,,I am context., real path name:/zk-test-ephemeral-0000000007

從這個程式片段中可以看出,使用非同步方式建立介面也很簡單。使用者僅僅需要實現 AsyncCallback.StringCallback() 介面即可。AsyncCallback 包含了 StatCallbackDataCalbackACLCallbackChildrenCallbackChildren2CallbackStringCallbackVoidCallback 七種不同的回撥介面,使用者可以在不同的非同步介面中實現不同的介面。

和同步介面最大的區別在於,節點的建立過程(包括網路通訊和服務端的節點建立過程)是非同步的。並且,在同步介面呼叫過程中,我們需要關注介面丟擲異常的可能;但是在非同步介面中,介面本身是不會丟擲異常的、所有的異常都會在回撥函式中通過Result Code(響應嗎)來實現。

下面來重點看下回調方法:void processResult(int rc, String path, Object ctx, String name)。這個方法的幾個引數主要如下表。

引數名 說明
rc Result Code,服務端響應碼。客戶端可以從這個響應碼中識別出API呼叫的結果,常見的響應碼如下:0(OK):介面呼叫成功-4(ConnectionLoss):客戶端和服務端連線已斷開-110(NodeExists):指定節點已存在-112(SessionExpired):會話已過期
path 介面呼叫時傳入API的資料節點路徑引數值
ctx 介面呼叫時傳入API的ctx引數值
name 實際在服務端建立的節點名。在上述程式碼中,第三次建立節點時,由於建立的節點型別是順序節點,因此在服務端沒有真正建立好順序節點之前,客戶端無法知道節點的完整節點路徑。於是,在回撥方法中,服務端會返回這個資料節點的完整節點路徑。