ZooKeeper客戶端curator元件介紹
Curator framework提供了高階API, 極大的簡化了ZooKeeper的使用。 它在ZooKeeper基礎上增加了很多特性,可以管理與ZOoKeeper的連線和重試機制。這些特性包括:
-
自動連線管理
** 有些潛在的錯誤情況需要讓ZooKeeper client重建連線和重試。Curator可以自動地和透明地處理這些情況 -
Cleaner API
簡化原始的ZooKeeper方法,事件等 提供現代的流式介面 -
技巧(Recipe)實現
Leader選舉 共享鎖
Path快取和監控 分散式佇列
分散式優先順序佇列 …
產生Curator framework 例項
使用CuratorFrameworkFactory產生framework例項。 CuratorFrameworkFactory 既提供了factory方法也提供了builder來建立例項。 CuratorFrameworkFactory是執行緒安全的
工廠方法(newClient())提供了一個簡單的方式建立例項。Builder可以使用更多的引數控制生成的例項。一旦生成framework例項, 必須呼叫start方法啟動它。應用結束時應該呼叫close方法關閉它。
CuratorFramework API
CuratorFramework 使用流程風格的介面。 程式碼勝於說教:
1234 |
client.create().forPath("/head", new byte[0]);client.delete().inBackground().forPath("/head" |
方法
方法名 | 描述 |
---|---|
create() | 開始一create操作. 可以呼叫額外的方法(mode or background),最後呼叫forPath() |
delete() | 開始一個delete操作. 呼叫額外的方法(version or background) , 最好呼叫forPath() |
checkExists() | 開始一個檢查ZNode是否存在的操作. 呼叫額外的方法 (watch or background), 最後呼叫forPath() |
getData() | 開始一個獲取ZNode節點資料的操作. 呼叫額外的方法(watch, background or get stat), 最後呼叫forPath() |
setData() | 開始一個設定ZNode節點資料的操作. 呼叫額外的方法(version or background), 最後呼叫forPath() |
getChildren() | 開始一個獲取ZNode的子節點列表的操作.呼叫額外的方法(watch, background or get stat), 最後呼叫forPath() |
inTransaction() | 開始一個原子的ZooKeeper事務. 可以包含 create, setData, check, and/or delete 操作的組合, 然後commit() 作為一個原子操作. |
通知 Notifications
服務於後臺操作和監控(watch)的通知通過ClientListener介面釋出。你通過CuratorFramework例項的addListener方法可以註冊監聽器。
Interface CuratorListener
1 | eventReceived() A background operation has completed or a watch has triggered. Examine the given event for details |
Interface ConnectionStateListener:
12 | stateChanged(CuratorFramework client, ConnectionState newState) Called when there is a state change in the connection |
Interface UnhandledErrorListener:
12 | unhandledError(String message, Throwable e) Called when an exception is caught in a background thread, handler, etc. |
ClientEvent
ClientEvent是事件父類, 代表後臺通知和監控的型別。 ClientEvent有用的欄位根據事件的型別(getType()方法獲取)不同而不同。
Event Type | Event Methods |
---|---|
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GETDATA | getResultCode(), getPath(), getStat() and getData() |
SETDATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
WATCHED | getWatchedEvent() |
名稱空間 namespace
因為ZOoKeeper是一個共享的叢集。所以名稱空間約定極為重要,各個應用在使用同一叢集時不會有衝突的ZK path。
CuratorFramework 提供了名稱空間的概念。當生成CuratorFramework 可以設定名稱空間。CuratorFramework在呼叫API會在所有的path前面加上名稱空間。
1234 | CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build(); ...client.create().forPath("/test", data);// node was actually written to: "/MyApp/test" |
臨時連線
Temporary CuratorFramework instances are meant for single requests to ZooKeeper ensembles over a failure prone network such as a WAN.
臨時的CuratorFramework用來發送單獨請求通過一個易出錯的網路如WAN。CuratorTempFramework 的API是有限制的,而且,連線在不用一段時間後會被關閉。
這個想法基於Camille Fournier的文章: http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html
建立CuratorTempFramework
就像正常的CuratorFramework 例項一樣,CuratorTempFramework依然通過CuratorFrameworkFactory 建立。但是最後不是呼叫build()方法, 而是buildTemp()。buildTemp()建立建立CuratorTempFramework然後在它不用三分鐘後就關閉它。有個buildTemp()過載版本可以設定不活躍(不用)的時間。
Limited API
CuratorTempFramework提供了下列方法:
1234567891011121314151617181920 | /** * Stop the client */ public void close(); /** * Start a transaction builder * * @return builder object * @throws Exception errors */ public CuratorTransaction inTransaction() throws Exception; /** * Start a get data builder * * @return builder object * @throws Exception errors */ public TempGetDataBuilder getData() throws Exception; |
Apache Curator Utilities
Curator提供了一組工具類和方法用來測試基於Curator的應用。 並且提供了操作ZNode輔助類以及其它一些資料結構
Test Server
curator-test提供了TestingServer類。 這個類建立了一個本地的, 同進程的ZooKeeper伺服器用來測試。
Test Cluster
curator-test提供了TestingCluster類。 這個類建立了一個內部的ZooKeeper叢集用來測試。
ZKPaths
提供了各種靜態方法來操作ZNode:
- getNodeFromPath: 從一個全路徑中得到節點名, 比如 “/one/two/three” 返回 “three”
- mkdirs: 確保所有的節點都已被建立
- getSortedChildren: 得到一個給定路徑的子節點, 按照sequence number排序
- makePath: 給定父路徑和子節點,建立一個全路徑
EnsurePath
確保一個特定的路徑被建立。當它第一次使用時,一個同步ZKPaths.mkdirs(ZooKeeper, String)呼叫被觸發來確保完整的路徑都已經被建立。後續的呼叫將不是同步操作.
用法:
12345678 | EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);...String nodePath = aFullPathToEnsure + "/foo";ensurePath.ensure(zk); // first time syncs and creates if neededzk.create(nodePath, ...);...ensurePath.ensure(zk); // subsequent times are NOPszk.create(nodePath, ...); |
注意: 此方法namespace會參與路徑名字的建立。
BlockingQueueConsumer
提供JDK BlockingQueue類似的行為。
QueueSharder
由於zookeeper傳輸層的限制, 單一的佇列如果超過10K的元素會被分割(break)。 這個類為多個分散式佇列提供了一個facade。 它監控佇列, 如果一個佇列超過這個閾值, 一個新的佇列就被建立。 在這些佇列中Put是分散式的。
Reaper and ChildReaper
Reaper
可以用來刪除鎖的父路徑。定時檢查路徑被加入到reaper中。 當檢查時,如果path沒有子節點/路徑, 此路徑將被刪除。每個應用中CLient應該只建立一個reaper例項。必須將lock path加到這個readper中。 reaper會定時的檢查刪除它們。
ChildReaper
用來清除父節點下所有的空節點。定時的呼叫getChildren()並將空節點加入到內部管理的reaper中。
- 注意: 應該考慮使用LeaderSelector來執行Reapers, 因為它們不需要在每個client執行.
Apache Curator Client
Curator client使用底層的API, 強烈推薦你是用Curator Framework代替使用CuratorZookeeperClient
背景
CuratorZookeeperClient 是ZOoKeeper client的包裝類。但是提供了更簡單方式, 而且可以減少錯誤的發生。它提供了下列的特性:
- 持續的連線管理 - ZooKeeper有很多的關於連線管理的警告(你可以到ZooKeeper FAQ檢視細節)。CuratorZookeeperClient 可以自動的管理這些事情。
- retry - 提供一個方式處理retry。
- Test ZooKeeper server - 提供一個程序內的ZooKeeper測試伺服器用來測試和實驗。
方法
Method | Description |
---|---|
Constructor | 建立一個給定ZooKeeper叢集的連線。 你可以傳入一個可選的watcher. 必須提供Retry策略 |
getZooKeeper() | 返回管理的ZooKeeper例項. 重要提示: a) 它會花費些許時間等待連線來完成, 在使用其它方法之前你應該校驗連線是否完成. b) 管理的ZooKeeper例項可以根據特定的事件而改變。 不要持有例項太長時間. 總是呼叫getZooKeeper()得到一個新的例項. |
isConnected() | 返回ZooKeeper client當前連線狀態 |
blockUntilConnectedOrTimedOut() | block知道連線成功或者超時 |
close() | 關閉連線 |
setRetryPolicy() | 改變retry策略 |
newRetryLoop() | 分配一個新的Retry Loop - 詳情看下邊 |
Retry Loop
由於各種各樣的原因, 在zookeeper叢集上的操作難免遇到失敗的情況。最佳實踐表明應該提供重試機制。Retry Loop 為此而生。 每個操作都被包裝在一個Retry Loop中。下面是一個典型的處理流程:
12345678910111213141516 | RetryLoop retryLoop = client.newRetryLoop();while ( retryLoop.shouldContinue() ){ try { // perform your work ... // it's important to re\-get the ZK instance as there may have been an error and the instance was re\-created ZooKeeper zk = client.getZookeeper(); retryLoop.markComplete(); } catch ( Exception e ) { retryLoop.takeException(e); }} |
Retry Loop維護一定數量的retry, 它還決定一個錯誤是否可以要執行retry。 假如一個錯誤需要retry,Retry策略被呼叫來決定retry是要要執行,執行多少次才放棄。
很方便地,RetryLoop 提供了一個靜態方法使用Callable來執行一個完整retry loop。
123456789 | RetryLoop.callWithRetry(client, new Callable<Void>(){ @Override public Void call() throws Exception { // do your work here - it will get retried if needed return null; }}); |
Retry策略
retry策略可以改變retry的行為。 它抽象出RetryPolicy介面, 包含一個方法public
boolean allowRetry(int retryCount, long elapsedTimeMs);。 在retry被嘗試執行前, allowRetry()被呼叫,並且將當前的重試次數和操作已用時間作為引數. 如果返回true, retry被執行。否則異常被丟擲。
Curator本身提供了幾個策略(在 com.netflix.curator.retry 包下):
Policy Name | Description |
---|---|
ExponentialBackoffRetry | 重試一定次數,每次重試sleep更多的時間 |
RetryNTimes | 重試N次 |
RetryOneTime | 重試一次 |
RetryUntilElapsed | 重試一定的時間 |