使用ZooKeeper Java API程式設計
https://www.cnblogs.com/IcanFixIt/p/7882107.html
https://blog.csdn.net/qiushisoftware/article/details/79043379
https://blog.csdn.net/wo541075754/article/details/65625481
ZooKeeper是用Java開發的,3.4.6版本的Java API文件可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上找到。
Tips
本章的程式碼在Linux作業系統下進行測試,執行ZooKeeper伺服器例項的版本為3.4.6。
開發應用程式的ZooKeeper Java繫結主要由兩個Java包組成:
- org.apache.zookeeper
- org.apache.zookeeper.data
org.apache.zookeeper包由ZooKeeper監視的介面定義和ZooKeeper的各種回撥處理程式組成。 它定義了ZooKeeper客戶端類庫的主要類以及許多ZooKeeper事件型別和狀態的靜態定義。 org.apache.zookeeper.data包定義了與資料暫存器(也稱為znode)相關的特性,例如訪問控制列表(ACL),IDs,stats等。
ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是伺服器實現的一部分。 org.apache.zookeeper.client包用於查詢ZooKeeper伺服器的狀態。
一 準備開發環境
Apache ZooKeeper是一個複雜的軟體,因此它需要執行許多其他類庫。 依賴庫作為jar檔案在ZooKeeper發行版中附帶在lib目錄中。 核心ZooKeeper jar檔名字為zookeeper-3.4.6.jar,位於主目錄下。
要開發Java的ZooKeeper應用程式,我們必須設定指向ZooKeeper jar的類路徑,以及ZooKeeper所依賴的所有第三方庫。在 bin 目錄下有一個 zkEnv.sh檔案,可以用來設定CLASSPATH。
我們需要將指令碼如下設定,在命令列中執行以下語句:
$ ZOOBINDIR=${ZK_HOME}/bin
$ source ${ ZOOBINDIR}/zkEnv.sh
shell變數ZK_HOME
被設定為安裝ZooKeeper的路徑,在我的設定中,它是/usr/share/zookeeper
。 之後,CLASSPATH變數被正確設定,在我的系統中,如下所示:
$ echo $CLASSPATH
/usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:
在Windows作業系統中,需要執行zkEnv.cmd指令碼。 現在可以使用CLASSPATH變數來編譯和執行使用ZooKeeper API編寫的Java程式。 可以在Uni/Linux中的主目錄的.bashrc檔案中找到zkEnv.sh指令碼,避免每次啟動shell會話時都採用它。
二 第一個ZooKeeper程式
為了引入ZooKeeper Java API,讓我們從一個非常簡單的程式開始,它可以連線到localhost中的ZooKeeper例項,如果連線成功,它將在ZooKeeper名稱空間的根路徑下列印znode的列表。
這個程式的程式碼如下所示:
/*Our First ZooKeeper Program*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
public class HelloZooKeeper {
public static void main(String[] args) throws IOException {
String hostPort = "localhost:2181";
String zpath = "/";
List <String> zooChildren = new ArrayList<String>();
ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
if (zk != null) {
try {
zooChildren = zk.getChildren(zpath, false);
System.out.println("Znodes of '/': ");
for (String child: zooChildren) {
//print the children
System.out.println(child);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在構建和執行前面的程式碼片段之前,讓我們來看看它具體做了什麼。程式碼從匯入語句開始。使用這些語句,我們匯入了程式各個元件所需的包。如前所述,org.apache.zookeeper包包含客戶端與ZooKeeper伺服器進行互動所需的所有類和介面。在匯入包之後,定義了一個名為HelloZooKeeper
的類。由於我們要連線到在同一系統中執行的ZooKeeper例項,在main
方法中將主機和埠字串定義為localhost:2181
。程式碼行zk = new ZooKeeper(hostPort, 2000, null)
呼叫ZooKeeper構造方法,該構造方法嘗試連線到ZooKeeper伺服器並返回一個引用。對於連線到ZooKeeper伺服器例項並維護該連線的客戶端程式,需要維護一個實時會話。在此例中,構造方法例項化的zk
物件返回的引用表示這個會話。 ZooKeeper API是圍繞這個引用構建的,每個方法呼叫都需要一個引用來執行。
ZooKeeper類的構造方法使用以下程式碼建立ZooKeeper例項的引用:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
使用的引數含義如下:
- connectString:以逗號分隔的主機:埠號列表,每個對應一個ZooKeeper伺服器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三個節點的ZooKeeper ensemble的有效的主機:埠匹配對。
- sessionTimeout:這是以毫秒為單位的會話超時時間。這是ZooKeeper在宣佈session結束之前,沒有從客戶端獲得心跳的時間。
- watcher:一個watcher物件,如果建立,當狀態改變和發生節點事件時會收到通知。這個watcher物件需要通過一個使用者定義的類單獨建立,通過實現
Watcher
介面並將例項化的物件傳遞給ZooKeeper構造方法。客戶端應用程式可以收到各種型別的事件的通知,例如連線丟失、會話過期等。
ZooKeeper Java API定義了另外帶有三個引數的構造方法,以指定更高階的操作。程式碼如下:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
在ZooKeeper類的上面的構造方法中,如果設定為true,boolean canBeReadOnly
引數允許建立的客戶端在網路分割槽的情況下進入只讀模式。只讀模式是客戶端無法找到任何多數伺服器的場景,但有一個可以到達的分割槽伺服器,以只讀模式連線到它,這樣就允許對伺服器的讀取請求,而寫入請求則不允許。客戶端繼續嘗試在後臺連線到大多數伺服器,同時仍然保持只讀模式。分割槽伺服器僅僅是ZooKeeper組的一個子集,它是由於叢集中的網路分配而形成的。大多數伺服器構成了ensemble中的大多數quorum。
以下構造方法顯示了兩個附加引數的定義:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
這個構造方法允許ZooKeeper客戶端物件建立兩個額外的引數:
- sessionId:在客戶端重新連線到ZooKeeper伺服器的情況下,可以使用特定的會話ID來引用先前連線的會話
- sessionPasswd:如果指定的會話需要密碼,可以在這裡指定
以下構造方法是前兩個呼叫的組合:
ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)
此構造方法是前兩個呼叫的組合,允許在啟用只讀模式的情況下重新連線到指定的會話。
Note
ZooKeeper類的詳細Java API文件可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查詢。
現在,回到我們的ZooKeeper程式。 在呼叫構造方法後,如果連線成功,我們將得到ZooKeeper伺服器的引用。 我們通過下面的程式碼將引用傳遞給getChildren
方法:
zooChildren = zk.getChildren(zpath, false)
ZooKeeper類的getChildren(String path,boolean watch)
方法返回給定路徑上znode的子級列表。 我們只是迭代這個方法返回的列表,並將字串列印到控制檯。
將程式命名為HelloZooKeeper.java,並編譯我們的程式如下:
$ javac -cp $CLASSPATH HelloZooKeeper.java
在我們執行的程式之前,需要使用以下命令來啟動ZooKeeper伺服器例項:
$ ${ZK_HOME}/bin/zkServer.sh start
執行程式如下:
$ java -cp $CLASSPATH HelloZooKeeper
執行程式會在控制檯上列印日誌訊息,顯示所使用的ZooKeeper版本,Java版本,Java類路徑,伺服器體系結構等等。 這裡顯示了這些日誌訊息的一部分:
ZooKeeper Java API生成的日誌訊息對除錯非常有用。 它為我們提供了關於客戶端連線到ZooKeeper伺服器,建立會話等後臺得資訊。 上面顯示的最後三條日誌訊息告訴我們客戶端如何使用程式中指定的引數來啟動連線,以及在成功連線後,伺服器如何為客戶端分配會話ID。
最後,程式執行最後在控制檯中輸出以下內容:
我們可以使用ZooKeeper shell來驗證程式的正確性:
$ $ZK_HOME/bin/zkCli.sh -server localhost
恭喜! 我們剛剛成功編寫了我們的第一個ZooKeeper客戶端程式。
二 實現Watcher介面
ZooKeeper Watcher監視使客戶端能夠接收來自ZooKeeper伺服器的通知,並在發生時處理這些事件。 ZooKeeper Java API提供了一個名為Watcher
的公共介面,客戶端事件處理程式類必須實現該接口才能接收有關來自ZooKeeper伺服器的事件通知。 以程式設計方式,使用這種客戶端的應用程式通過向客戶端註冊回撥(callback)物件來處理這些事件。
我們將實現Watcher
介面,處理與znode關聯的資料更改時由ZooKeeper生成的事件。
Watcher
介面在org.apache.zookeeper包中宣告如下:
public interface Watcher {
void process(WatchedEvent event);
}
為了演示znode資料監視器(Watcher),有兩個Java類:DataWatcher
和DataUpdater
。 DataWatcher
將一直執行,並在/MyConfig
指定znode路徑中偵聽來自ZooKeeper伺服器的NodeDataChange
事件。DataUpdate
r類將定期更新此znode路徑中的資料欄位,這將生成事件,並且在接收到這些事件後,DataWatcher
類將把更改後的資料列印到控制檯上。
以下是DataWatcher.java
類的程式碼:
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class DataWatcher implements Watcher, Runnable {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
byte zoo_data[] = null;
ZooKeeper zk;
public DataWatcher() {
try {
zk = new ZooKeeper(hostPort, 2000, this);
if (zk != null) {
try {
//Create the znode if it doesn't exist, with the following code:
if (zk.exists(zooDataPath, this) == null) {
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void printData() throws InterruptedException, KeeperException {
zoo_data = zk.getData(zooDataPath, this, null);
String zString = new String(zoo_data);
// The following code prints the current content of the znode to the console:
System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
}
@Override
public void process(WatchedEvent event) {
System.out.printf(
"\nEvent Received: %s", event.toString());
//We will process only events of type NodeDataChanged
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
printData();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args)
throws InterruptedException, KeeperException {
DataWatcher dataWatcher = new DataWatcher();
dataWatcher.printData();
dataWatcher.run();
}
public void run() {
try {
synchronized (this) {
while (true) {
wait();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
我們來看一下DataWatcher.java
類的程式碼來理解一個ZooKeeper監視器的實現。 DataWatcher
公共類實現Watcher
介面以及Runnable
介面,打算將監視器作為執行緒執行。 main
方法建立DataWatcher
類的一個例項。 在前面的程式碼中,DataWatcher
構造方法嘗試連線到在本地主機上執行的ZooKeeper例項。 如果連線成功,我們用下面的程式碼檢查znode路徑/MyConfig
是否存在:
if (zk.exists(zooDataPath, this) == null) {
如果znode不存在ZooKeeper名稱空間中,那麼exists
方法呼叫將返回null,並且嘗試使用程式碼將其建立為持久化znode,如下所示:
zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
接下來是process
方法,它在org.apache.ZooKeeper的Watcher
介面中宣告,並由DataWatcher
類使用以下程式碼實現:
public void process(WatchedEvent event) {
為了簡單起見,在process
方法中,列印從ZooKeeper例項接收的事件,並僅對NodeDataChanged
型別的事件進行進一步處理,如下所示:
if (event.getType() == Event.EventType.NodeDataChanged)
當znode路徑/MyConfig
的資料欄位發生任何更新或更改而收到NodeDataChanged
型別的事件時,呼叫printData
方法來列印znode的當前內容。 在znode上執行一個getData
呼叫時,我們再次設定一個監視,這是該方法的第二個引數,如下面的程式碼所示:
zoo_data = zk.getData(zooDataPath, this, null);
監視事件是傳送給設定監視的客戶端的一次性觸發器,為了不斷接收進一步的事件通知,客戶端應該重置監視器。
DataUpdater.java
是一個簡單的類,它連線到執行本地主機的ZooKeeper例項,並用隨機字串更新znode路徑/MyConfig
的資料欄位。 在這裡,我們選擇使用通用唯一識別符號(UUID)字串更新znode,因為後續的UUID生成器呼叫將保證生成唯一的字串。
DataUpdater.java
類程式碼如下:
import java.io.IOException;
import java.util.UUID;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DataUpdater implements Watcher {
private static String hostPort = "localhost:2181";
private static String zooDataPath = "/MyConfig";
ZooKeeper zk;
public DataUpdater() throws IOException {
try {
zk = new ZooKeeper(hostPort, 2000, this);
} catch (IOException e) {
e.printStackTrace();
}
}
// updates the znode path /MyConfig every 5 seconds with a new UUID string.
public void run() throws InterruptedException, KeeperException {
while (true) {
String uuid = UUID.randomUUID().toString();
byte zoo_data[] = uuid.getBytes();
zk.setData(zooDataPath, zoo_data, -1);
try {
Thread.sleep(5000); // Sleep for 5 secs
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws
IOException, InterruptedException, KeeperException {
DataUpdater dataUpdater = new DataUpdater();
dataUpdater.run();
}
@Override
public void process(WatchedEvent event) {
System.out.printf("\nEvent Received: %s", event.toString());
}
}
上面的程式碼使ZooKeeper伺服器觸發一個NodeDataChanged
事件。 由於DataWatcher
為此znode路徑設定了監視,因此它會接收資料更改事件的通知。 然後它檢索更新的資料,重置監視,並在控制檯上列印資料。
使用以下命令編譯DataWatcher
和DataUpdater
類:
$ javac –cp $CLASSPATH DataWatcher.java
$ javac –cp $CLASSPATH DataUpdater.java
要執行監視器和更新程式,需要開啟兩個終端視窗。 我要先執行監視器,因為它建立了/MyConfig
的znode(如果還未在ZooKeeper的名稱空間中建立的話)。 執行監視器之前,請確保ZooKeeper伺服器在本地主機上已經執行。
在其中一個終端視窗中,通過執行以下命令來執行watcher類:
$ java –cp $CLASSPATH DataWatcher
輸出類似於以下螢幕截圖所示的訊息:
如前面的截圖所示,znode路徑/MyConfig
是由DataWatcher
類建立的。 它也列印znode的內容,但沒有列印在控制檯中,因為我們在建立znode時沒有設定任何資料。 當znode被建立時,類中的監視者收到了NodeCreated
型別的事件通知,這個通知被列印在控制檯中。 DataWatcher
類繼續執行,並從ZooKeeper伺服器偵聽/MyConfig
節點上的事件。
讓我們在另一個終端視窗中執行DataUpdater
類:
$ java -cp $CLASSPATH DataUpdater
將最初的ZooKeeper特定日誌訊息記錄到控制檯後,DataUpdater
類執行時沒有提示。 它將一個新的UUID字串設定到ZooKeeper路徑/MyConfig
的資料欄位中。 因此,看到每隔5秒鐘,在下面的螢幕截圖中顯示的輸出內容列印在執行DataWatche
的終端視窗中:
DataWatcher
也可以使用ZooKeeper shell進行測試。 繼續像以前一樣在終端中執行DataWatcher
類,並在另一個終端中呼叫ZooKeeper shell並執行以下螢幕截圖中所示的命令:
在DataWatcher正在執行的終端中,將列印以下訊息:
三 示例——群集監視器
通過網際網路提供的流行服務,如電子郵件,檔案服務平臺,線上遊戲等,都是通過跨越多個數據中心的高度可用的成百上千臺伺服器來服務的,而這些伺服器通常在地理位置上分開。 在這種叢集中,設定了一些專用的伺服器節點來監視生產網路中承載服務或應用程式的伺服器的活躍性。 在雲端計算環境中,也用於管理雲環境的這種監控節點被稱為雲控制器。 這些控制器節點的一個重要工作是實時檢測生產伺服器的故障,並相應地通知管理員,並採取必要的措施,例如將故障伺服器上的應用程式故障轉移到另一個伺服器,從而確保容錯性和高可用性。
在本節中,我們將使用ZooKeeper Java客戶端API開發一個簡約的分散式叢集監視器模型。 使用ZooKeeper的ephemeral znode概念來構建這個監視模型相當簡單和優雅,如以下步驟所述:
- 每個生產伺服器執行一個ZooKeeper客戶端作為守護程序。 這個過程連線到ZooKeeper伺服器,並在
/ZooKeeper
名稱空間的預定義路徑(比如/Members
)下建立一個帶有名稱(最好是其網路名稱或主機名)的ephemeral znode。 - 雲控制器節點執行ZooKeeper監視器程序,