1. 程式人生 > >深入ZooKeeper——開始使用ZooKeeper的API

深入ZooKeeper——開始使用ZooKeeper的API

設定ZooKeeper的CLASSPATH

我們需要設定正確的classpath,以便執行或編譯ZooKeeper的Java程式碼。除了ZooKeeper的JAR包外,ZooKeeper使用了大量的第三方庫。為了簡化輸入和方便閱讀,我們使用環境變數CLASSPATH來表示所有必需的庫。
ZooKeeper發行包中bin目錄下的zkEnv.sh指令碼會為我們設定該環境變數。我們需要使用以下方式來編碼:
在這裡插入圖片描述
(在Windows上,使用call命令呼叫,而不是使用zkEnv.cmd指令碼。)
一旦執行這個指令碼,環境變數CLASSPATH就會正確設定。我們在編譯和執行Java程式時用到它。

建立ZooKeeper會話

ZooKeeper的API圍繞ZooKeeper的控制代碼(handle)而構建,每個API呼叫都需要傳遞這個控制代碼。這個控制代碼代表與ZooKeeper之間的一個會話。
建立ZooKeeper控制代碼的建構函式如下所示:
在這裡插入圖片描述

  • connectString
    包含主機名和ZooKeeper伺服器的埠。我們之前通過zkCli連線ZooKeeper服務時,已經列出過這些伺服器。
  • sessionTimeout
    以毫秒為單位,表示ZooKeeper等待客戶端通訊的最長時間,之後會宣告會話已死亡。
  • watcher
    用於接收會話事件的一個物件,這個物件需要我們自己建立。
    Wacher被定義為介面
    ,所以我們需要自己實現一個類,然後初始化這個類的例項並傳入ZooKeeper的建構函式中。客戶端使用Watcher介面來監控與ZooKeeper之間會話的健康情況。與ZooKeeper伺服器之間建立或失去連線時就會產生事件。它們同樣還能用於監控ZooKeeper資料的變化。最終,如果與ZooKeeper的會話過期,也會通過Watcher介面傳遞事件來通知客戶端的應用。

實現一個Watcher
為了從ZooKeeper接收通知,我們需要實現監視點。首先讓我們進一步瞭解Watcher介面,該介面的定義如下
在這裡插入圖片描述
這個介面沒有多少內容,我們不得不自己實現,但現在我們只是簡單地輸出事件。所以,讓我們從一個名為Master的類開始實現示例:

import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
public class Master implements Watcher {
	ZooKeeper zk;
	String hostPort;
	Master(String hostPort) {
		this.hostPort = hostPort;}
	void startZK() {
		zk = new ZooKeeper(hostPort, 15000, this);
	}
	public void process(WatchedEvent e) {
		System.out.println(e);}
	public static void main(String args[])
		throws Exception {
		Master m = new Master(args[0]);
		m.startZK();
		// wait for a bit
		Thread.sleep(60000);
	}
}

①在建構函式中,我們並未例項化ZooKeeper物件,而是先儲存hostPort留著後面使用。Java最佳實踐告訴我們,一個物件的建構函式沒有完成前不要呼叫這個物件的其他方法。因為這個物件實現了Watcher,並且當我們例項化ZooKeeper物件時,其Watcher的回撥函式就會被呼叫,所以我們需要Master的建構函式返回後再呼叫ZooKeeper的建構函式。
②這個簡單的示例沒有提供複雜的事件處理邏輯,而只是將我們收到的事件進行簡單的輸出。

通過以下方式就可以編譯這個簡單的例子:
在這裡插入圖片描述
當我們編譯完Master.java這個檔案後,執行它並檢視結果:
在這裡插入圖片描述
ZooKeeper客戶端API產生很多日誌訊息,使使用者可以瞭解發生了什麼。

ZooKeeper管理連線
請不要自己試著去管理ZooKeeper客戶端連線。ZooKeeper客戶端庫會監控與服務之間的連線,客戶端庫不僅告訴我們連線發生問題,還會主動嘗試重新建立通訊。一般客戶端開發庫會很快重建會話,以便最小化應用的影響。所以不要關閉會話後再啟動一個新的會話,這樣會增加系統負載,並導致更長時間的中斷。

獲取管理權

為了確保同一時間只有一個主節點程序出於活動狀態,我們使用ZooKeeper來實現簡單的群首選舉演算法。
ZooKeeper通過外掛式的認證方法提供了每個節點的ACL策略功能,因此,如果我們需要,就可以限制某個使用者對某個znode節點的哪些許可權。
當然,我們希望在主節點死掉後/master節點會消失,我們可以使用ZooKeeper
的臨時性znode節點來達到我們的目的。
因此,我們將會在我們的程式中新增以下程式碼:

String serverId = Integer.toHexString(random.nextInt());
void runForMaster() {
	zk.create("/master",①
		serverId.getBytes(),②
		OPEN_ACL_UNSAFE,③
		CreateMode.EPHEMERAL);}

①我們試著建立znode節點/master。如果這個znode節點存在,create就會失敗。同時我們想在/master節點的資料欄位儲存對應這個伺服器的唯一ID。
②資料欄位只能儲存位元組陣列型別的資料,所以我們將int型轉換為一個位元組陣列。
③如之前所提到的,我們使用開放的ACL策略。
④我們建立的節點型別為EPHEMERAL(臨時)。

然而,我們這樣做還不夠,create方法會丟擲兩種異常:
KeeperException和InterruptedException。我們需要確保我們處理了這兩種異常,特別是ConnectionLossException(KeeperException異常的子類)和InterruptedException。對於其他異常,我們可以忽略並繼續執行,但對於這兩種異常,create方法可能已經成功了,所以如果我們作為主節點就需要捕獲並處理它們。

當處理ConnectionLossException異常時,我們需要找出那個程序建立的/master節點,如果程序是自己,就開始成為群首角色。我們通過getData方法來處理:
在這裡插入圖片描述

  • path
    類似其他ZooKeeper方法一樣,第一個引數為我們想要獲取資料的znode節點路徑。
  • watch
    表示我們是否想要監聽後續的資料變更。如果設定為true,我們就可以通過我們建立ZooKeeper控制代碼時所設定的Watcher物件得到事件,同時另一個版本的方法提供了以Watcher物件為入參,通過這個傳入的物件來接收變更的事件。
    現在我們設定這個引數為false,因為我們現在我們只想知道當前的資料是什麼。
  • stat
    最後一個引數型別Stat結構,getData方法會填充znode節點的元資料資訊。

讓我們按以下程式碼段來修改程式碼,在runForMaster方法中引入異常處理:

String serverId = Integer.toString(Random.nextLong());
boolean isLeader = false;
// returns true if there is a master
boolean checkMaster() {
	while (true) {
		try {
			Stat stat = new Stat();
			byte data[] = zk.getData("/master", false, stat);①
			isLeader = new String(data).equals(serverId));return true;
		} catch (NoNodeException e) {
			// no master, so try create again
			return false;
		} catch (ConnectionLossException e) {
		}
	}
}
void runForMaster() throws InterruptedException {while (true) {
		try {④
			zk.create("/master", serverId.getBytes(),
				OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);⑤
			isLeader = true;
			break;
		} catch (NodeExistsException e) {
			isLeader = false;
		break;
		} catch (ConnectionLossException e) {}
		if (checkMaster()) break;}
}

①通過獲取/master節點的資料來檢查活動主節點。
②該行展示了為什麼我們需要使用在建立/master節點時儲存的資料:如果/master存在,我們使用/master中的資料來確定誰是群首。如果一個程序捕獲到ConnectionLossException,這個程序可能就是主節點,因create操作實際上已經處理完,但響應訊息卻丟失了。
③我們將InterruptedException異常簡單地傳遞給呼叫者。
④我們將zk.create方法包在try塊之中,以便我們捕獲並處理ConnectionLossException異常。
⑤這裡為create請求,如果成功執行將會成為主節點。
⑥處理ConnectionLossException異常的catch塊的程式碼為空,因為我們並不想中止函式,這樣就可以使處理過程繼續向下執行。
⑦檢查活動主節點是否存在,如果不存在就重試。

現在,我們看一下Master的main主函式:

public static void main(String args[]) throws Exception {
	Master m = new Master(args[0]);
	m.startZK();
	m.runForMaster();if (isLeader) {
		System.out.println("I'm the leader");// wait for a bit
		Thread.sleep(60000);
	} else {
		System.out.println("Someone else is the leader");
	}
	m.stopZK();
}

其他暫略。