【hadoop zookeeper】Zookeeper開源客戶端框架Curator簡介
阿新 • • 發佈:2019-01-30
Curator是Netflix開源的一套ZooKeeper客戶端框架. Netflix在使用ZooKeeper的過程中發現ZooKeeper自帶的客戶端太底層, 應用方在使用的時候需要自己處理很多事情, 於是在它的基礎上包裝了一下, 提供了一套更好用的客戶端框架. Netflix在用ZooKeeper的過程中遇到的問題,
我們也遇到了, 所以開始研究一下, 首先從他在github上的原始碼, wiki文件以及Netflix的技術blog入手.
看完官方的文件之後, 發現Curator主要解決了三類問題:
Curator列舉的ZooKeeper使用過程中的幾個問題
初始化連線的問題: 在client與server之間握手建立連線的過程中, 如果握手失敗, 執行所有的同步方法(比如create, getData等)將丟擲異常
自動恢復(failover)的問題: 當client與一臺server的連線丟失,並試圖去連線另外一臺server時, client將回到初始連線模式
session過期的問題: 在極端情況下, 出現ZooKeeper session過期, 客戶端需要自己去監聽該狀態並重新建立ZooKeeper例項 .
對可恢復異常的處理:當在server端建立一個有序ZNode, 而在將節點名返回給客戶端時崩潰, 此時client端丟擲可恢復的異常, 使用者需要自己捕獲這些異常並進行重試
使用場景的問題:Zookeeper提供了一些標準的使用場景支援, 但是ZooKeeper對這些功能的使用說明文件很少, 而且很容易用錯. 在一些極端場景下如何處理, zk並沒有給出詳細的文件說明. 比如共享鎖服務, 當伺服器端建立臨時順序節點成功, 但是在客戶端接收到節點名之前掛掉了, 如果不能很好的處理這種情況, 將導致死鎖.
Curator主要從以下幾個方面降低了zk使用的複雜性:
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標準的重試策略(比如指數補償).
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理.
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理. 並在需要的情況, 重建zk例項, 保證與zk叢集的可靠連線
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.
Curator通過以上的處理, 讓使用者專注於自身的業務本身, 而無需花費更多的精力在zk本身.
Curator聲稱的一些亮點:
日誌工具
內部採用SLF4J 來輸出日誌
採用驅動器(driver)機制, 允許擴充套件和定製日誌和跟蹤處理
提供了一個TracerDriver介面, 通過實現addTrace()和addCount()介面來整合使用者自己的跟蹤框架
和Curator相比, 另一個ZooKeeper客戶端——zkClient(https://github.com/sgroschupf/zkclient)的不足之處:
文件幾乎沒有
異常處理弱爆了(簡單的丟擲RuntimeException)
重試處理太難用了
沒有提供各種使用場景的實現
對ZooKeeper自帶客戶端(ZooKeeper類)的"抱怨":
只是一個底層實現
要用需要自己寫大量的程式碼
很容易誤用
需要自己處理連線丟失, 重試等
Curator幾個組成部分
Client
這是一個底層的API, 應用方基本對這個可以無視, 最好直接從Curator Framework入手
主要包括三部分:
不間斷連線管理
連線重試處理
Retry Loop(迴圈重試)
一種典型的用法:
Java程式碼
如果在操作過程中失敗, 且這種失敗是可重試的, 而且在允許的次數內, Curator將保證操作的最終完成.
另一種使用Callable介面的重試做法:
Java程式碼
重試策略
RetryPolicy介面只有一個方法(以前版本有兩個方法):
public boolean allowRetry(int retryCount, long elapsedTimeMs);
在開始重試之前, allowRetry方法被呼叫, 其引數將指定當前重試次數, 和操作已消耗時間. 如果允許, 將繼續重試, 否則丟擲異常.
Curator內部實現的幾種重試策略:
Framework
是ZooKeeper Client更高的抽象API
自動連線管理: 當ZooKeeper客戶端內部出現異常, 將自動進行重連或重試, 該過程對外幾乎完全透明
更清晰的API: 簡化了ZooKeeper原生的方法, 事件等, 提供流程的介面
CuratorFrameworkFactory類提供了兩個方法, 一個工廠方法newClient, 一個構建方法build. 使用工廠方法newClient可以建立一個預設的例項, 而build構建方法可以對例項進行定製. 當CuratorFramework例項構建完成, 緊接著呼叫start()方法, 在應用結束的時候, 需要呼叫close()方法. CuratorFramework是執行緒安全的. 在一個應用中可以共享同一個zk叢集的CuratorFramework.
CuratorFramework API採用了連貫風格的介面(Fluent Interface). 所有的操作一律返回構建器, 當所有元素加在一起之後, 整個方法看起來就像一個完整的句子. 比如下面的操作:
Java程式碼
方法說明:
通知(Notification)
Curator的相關程式碼已經更新了, 裡面的介面已經由ClientListener改成CuratorListener了, 而且介面中去掉了clientCloseDueToError方法. 只有一個方法:
eventReceived() 當一個後臺操作完成或者指定的watch被觸發時該方法被呼叫
UnhandledErrorListener介面用來對異常進行處理.
CuratorEvent(在以前版本為ClientEvent)是對各種操作觸發相關事件物件(POJO)的一個完整封裝, 而事件物件的內容跟事件型別相關, 下面是對應關係:
名稱空間(Namespace)
因為一個zk叢集會被多個應用共享, 為了避免各個應用的zk patch衝突, Curator Framework內部會給每一個Curator Framework例項分配一個namespace(可選). 這樣你在create ZNode的時候都會自動加上這個namespace作為這個node path的root. 使用程式碼如下:
Java程式碼
Recipe
Curator實現ZooKeeper的所有recipe(除了兩段提交)
選舉
叢集領導選舉(leader election)
鎖服務
共享鎖: 全域性同步分散式鎖, 同一時間兩臺機器只有一臺能獲得同一把鎖.
共享讀寫鎖: 用於分散式的讀寫互斥處理, 同時生成兩個鎖:一個讀鎖, 一個寫鎖, 讀鎖能被多個應用持有, 而寫鎖只能一個獨佔, 當寫鎖未被持有時, 多個讀鎖持有者可以同時進行讀操作
共享訊號量: 在分散式系統中的各個JVM使用同一個zk lock path, 該path將跟一個給定數量的租約(lease)相關聯, 然後各個應用根據請求順序獲得對應的lease, 相對來說, 這是最公平的鎖服務使用方式.
多共享鎖:內部構件多個共享鎖(會跟一個znode path關聯), 在acquire()過程中, 執行所有共享鎖的acquire()方法, 如果中間出現一個失敗, 則將釋放所有已require的共享鎖; 執行release()方法時, 則執行內部多個共享鎖的release方法(如果出現失敗將忽略)
佇列(Queue)
分散式佇列:採用持久順序zk node來實現FIFO佇列, 如果有多個消費者, 可以使用LeaderSelector來保證佇列的消費者順序
分散式優先佇列: 優先佇列的分散式版本
BlockingQueueConsumer: JDK阻塞佇列的分散式版本
關卡(Barrier)
分散式關卡:一堆客戶端去處理一堆任務, 只有所有的客戶端都執行完, 所有客戶端才能繼續往下處理
雙分散式關卡:同時開始, 同時結束
計數器(Counter)
共享計數器:所有客戶端監聽同一個znode path, 並共享一個最新的integer計數值
分散式AtomicLong(AtomicInteger): AtomicXxx的分散式版本, 先採用樂觀鎖更新, 若失敗再採用互斥鎖更新, 可以配置重試策略來處理重試
工具類
Path Cache
Path Cache用於監聽ZNode的子節點的變化, 當add, update, remove子節點時將改變Path Cache state, 同時返回所有子節點的data和state.
Curator中採用了PathChildrenCache類來處理Path Cache, 狀態的變化則採用PathChildrenCacheListener來監聽.
相關用法參見TestPathChildrenCache測試類
注意: 當zk server的資料發生變化, zk client會出現不一致, 這個需要通過版本號來識別這種狀態的變化
Test Server
用來在測試中模擬一個本地程序內ZooKeeper Server.
Test Cluster
用來在測試中模擬一個ZooKeeper Server叢集
ZKPaths工具類
提供了和ZNode相關的path處理工具方法:
EnsurePath工具類
直接看例子, 具體的說就是呼叫多次, 只會執行一次建立節點操作.
Java程式碼
Notification事件處理
Curator對ZooKeeper的事件Watcher進行了封裝處理, 然後實現了一套監聽機制. 提供了幾個監聽介面用來處理ZooKeeper連線狀態的變化
當連接出現異常, 將通過ConnectionStateListener介面進行監聽, 並進行相應的處理, 這些狀態變化包括:
從com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我們可以知道, Curator分別將ZooKeeper的Disconnected, Expired, SyncConnected三種狀態轉換成上面三種狀態.
參考
from :http://macrochen.iteye.com/blog/1366136
看完官方的文件之後, 發現Curator主要解決了三類問題:
- 封裝ZooKeeper client與ZooKeeper server之間的連線處理;
- 提供了一套Fluent風格的操作API;
- 提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 叢集領導選舉機制)的抽象封裝.
Curator列舉的ZooKeeper使用過程中的幾個問題
初始化連線的問題: 在client與server之間握手建立連線的過程中, 如果握手失敗, 執行所有的同步方法(比如create, getData等)將丟擲異常
自動恢復(failover)的問題: 當client與一臺server的連線丟失,並試圖去連線另外一臺server時, client將回到初始連線模式
session過期的問題: 在極端情況下, 出現ZooKeeper session過期, 客戶端需要自己去監聽該狀態並重新建立ZooKeeper例項 .
對可恢復異常的處理:當在server端建立一個有序ZNode, 而在將節點名返回給客戶端時崩潰, 此時client端丟擲可恢復的異常, 使用者需要自己捕獲這些異常並進行重試
使用場景的問題:Zookeeper提供了一些標準的使用場景支援, 但是ZooKeeper對這些功能的使用說明文件很少, 而且很容易用錯. 在一些極端場景下如何處理, zk並沒有給出詳細的文件說明. 比如共享鎖服務, 當伺服器端建立臨時順序節點成功, 但是在客戶端接收到節點名之前掛掉了, 如果不能很好的處理這種情況, 將導致死鎖.
Curator主要從以下幾個方面降低了zk使用的複雜性:
重試機制:提供可插拔的重試機制, 它將給捕獲所有可恢復的異常配置一個重試策略, 並且內部也提供了幾種標準的重試策略(比如指數補償).
連線狀態監控: Curator初始化之後會一直的對zk連線進行監聽, 一旦發現連線狀態發生變化, 將作出相應的處理.
zk客戶端例項管理:Curator對zk客戶端到server叢集連線進行管理. 並在需要的情況, 重建zk例項, 保證與zk叢集的可靠連線
各種使用場景支援:Curator實現zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實現都遵循了zk的最佳實踐, 並考慮了各種極端情況.
Curator通過以上的處理, 讓使用者專注於自身的業務本身, 而無需花費更多的精力在zk本身.
Curator聲稱的一些亮點:
日誌工具
內部採用SLF4J 來輸出日誌
採用驅動器(driver)機制, 允許擴充套件和定製日誌和跟蹤處理
提供了一個TracerDriver介面, 通過實現addTrace()和addCount()介面來整合使用者自己的跟蹤框架
和Curator相比, 另一個ZooKeeper客戶端——zkClient(https://github.com/sgroschupf/zkclient)的不足之處:
文件幾乎沒有
異常處理弱爆了(簡單的丟擲RuntimeException)
重試處理太難用了
沒有提供各種使用場景的實現
對ZooKeeper自帶客戶端(ZooKeeper類)的"抱怨":
只是一個底層實現
要用需要自己寫大量的程式碼
很容易誤用
需要自己處理連線丟失, 重試等
Curator幾個組成部分
- Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法.
- Framework: 用來簡化ZooKeeper高階功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper叢集的連線, 重試處理
- Recipes: 實現了通用ZooKeeper的recipe, 該元件建立在Framework的基礎之上
- Utilities:各種ZooKeeper的工具類
- Errors: 異常處理, 連線, 恢復等.
- Extensions: recipe擴充套件
Client
這是一個底層的API, 應用方基本對這個可以無視, 最好直接從Curator Framework入手
主要包括三部分:
不間斷連線管理
連線重試處理
Retry Loop(迴圈重試)
一種典型的用法:
Java程式碼
- RetryLoop retryLoop = client.newRetryLoop();
- while ( retryLoop.shouldContinue() )
- {
- try
- {
- // perform your work
- ...
- // it's important to re-get the ZK instance as there may have been an error and the instance was re-created
- ZooKeeper zk = client.getZookeeper();
- retryLoop.markComplete();
- }
- catch ( Exception e )
- {
- retryLoop.takeException(e);
- }
- }
如果在操作過程中失敗, 且這種失敗是可重試的, 而且在允許的次數內, Curator將保證操作的最終完成.
另一種使用Callable介面的重試做法:
Java程式碼
- RetryLoop.callWithRetry(client, new Callable()
- {
- @Override
- public Void call() throws Exception
- {
- // do your work here - it will get retried if needed
- return null;
- }
- });
重試策略
RetryPolicy介面只有一個方法(以前版本有兩個方法):
public boolean allowRetry(int retryCount, long elapsedTimeMs);
在開始重試之前, allowRetry方法被呼叫, 其引數將指定當前重試次數, 和操作已消耗時間. 如果允許, 將繼續重試, 否則丟擲異常.
Curator內部實現的幾種重試策略:
- ExponentialBackoffRetry:重試指定的次數, 且每一次重試之間停頓的時間逐漸增加.
- RetryNTimes:指定最大重試次數的重試策略
- RetryOneTime:僅重試一次
- RetryUntilElapsed:一直重試直到達到規定的時間
Framework
是ZooKeeper Client更高的抽象API
自動連線管理: 當ZooKeeper客戶端內部出現異常, 將自動進行重連或重試, 該過程對外幾乎完全透明
更清晰的API: 簡化了ZooKeeper原生的方法, 事件等, 提供流程的介面
CuratorFrameworkFactory類提供了兩個方法, 一個工廠方法newClient, 一個構建方法build. 使用工廠方法newClient可以建立一個預設的例項, 而build構建方法可以對例項進行定製. 當CuratorFramework例項構建完成, 緊接著呼叫start()方法, 在應用結束的時候, 需要呼叫close()方法. CuratorFramework是執行緒安全的. 在一個應用中可以共享同一個zk叢集的CuratorFramework.
CuratorFramework API採用了連貫風格的介面(Fluent Interface). 所有的操作一律返回構建器, 當所有元素加在一起之後, 整個方法看起來就像一個完整的句子. 比如下面的操作:
Java程式碼
- client.create().forPath("/head", new byte[0]);
- client.delete().inBackground().forPath("/head");
- client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
- client.getData().watched().inBackground().forPath("/test");
方法說明:
- create(): 發起一個create操作. 可以組合其他方法 (比如mode 或background) 最後以forPath()方法結尾
- delete(): 發起一個刪除操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
- checkExists(): 發起一個檢查ZNode 是否存在的操作. 可以組合其他方法(watch 或background) 最後以forPath()方法結尾
- getData(): 發起一個獲取ZNode資料的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
- setData(): 發起一個設定ZNode資料的操作. 可以組合其他方法(version 或background) 最後以forPath()方法結尾
- getChildren(): 發起一個獲取ZNode子節點的操作. 可以組合其他方法(watch, background 或get stat) 最後以forPath()方法結尾
- inTransaction(): 發起一個ZooKeeper事務. 可以組合create, setData, check, 和/或delete 為一個操作, 然後commit() 提交
通知(Notification)
Curator的相關程式碼已經更新了, 裡面的介面已經由ClientListener改成CuratorListener了, 而且介面中去掉了clientCloseDueToError方法. 只有一個方法:
eventReceived() 當一個後臺操作完成或者指定的watch被觸發時該方法被呼叫
UnhandledErrorListener介面用來對異常進行處理.
CuratorEvent(在以前版本為ClientEvent)是對各種操作觸發相關事件物件(POJO)的一個完整封裝, 而事件物件的內容跟事件型別相關, 下面是對應關係:
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GET_DATA | getResultCode(), getPath(), getStat() and getData() |
SET_DATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
WATCHED | getWatchedEvent() |
名稱空間(Namespace)
因為一個zk叢集會被多個應用共享, 為了避免各個應用的zk patch衝突, Curator Framework內部會給每一個Curator Framework例項分配一個namespace(可選). 這樣你在create ZNode的時候都會自動加上這個namespace作為這個node path的root. 使用程式碼如下:
Java程式碼
- CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
- …
- client.create().forPath("/test", data);
- // node was actually written to: "/MyApp/test"
Recipe
Curator實現ZooKeeper的所有recipe(除了兩段提交)
選舉
叢集領導選舉(leader election)
鎖服務
共享鎖: 全域性同步分散式鎖, 同一時間兩臺機器只有一臺能獲得同一把鎖.
共享讀寫鎖: 用於分散式的讀寫互斥處理, 同時生成兩個鎖:一個讀鎖, 一個寫鎖, 讀鎖能被多個應用持有, 而寫鎖只能一個獨佔, 當寫鎖未被持有時, 多個讀鎖持有者可以同時進行讀操作
共享訊號量: 在分散式系統中的各個JVM使用同一個zk lock path, 該path將跟一個給定數量的租約(lease)相關聯, 然後各個應用根據請求順序獲得對應的lease, 相對來說, 這是最公平的鎖服務使用方式.
多共享鎖:內部構件多個共享鎖(會跟一個znode path關聯), 在acquire()過程中, 執行所有共享鎖的acquire()方法, 如果中間出現一個失敗, 則將釋放所有已require的共享鎖; 執行release()方法時, 則執行內部多個共享鎖的release方法(如果出現失敗將忽略)
佇列(Queue)
分散式佇列:採用持久順序zk node來實現FIFO佇列, 如果有多個消費者, 可以使用LeaderSelector來保證佇列的消費者順序
分散式優先佇列: 優先佇列的分散式版本
BlockingQueueConsumer: JDK阻塞佇列的分散式版本
關卡(Barrier)
分散式關卡:一堆客戶端去處理一堆任務, 只有所有的客戶端都執行完, 所有客戶端才能繼續往下處理
雙分散式關卡:同時開始, 同時結束
計數器(Counter)
共享計數器:所有客戶端監聽同一個znode path, 並共享一個最新的integer計數值
分散式AtomicLong(AtomicInteger): AtomicXxx的分散式版本, 先採用樂觀鎖更新, 若失敗再採用互斥鎖更新, 可以配置重試策略來處理重試
工具類
Path Cache
Path Cache用於監聽ZNode的子節點的變化, 當add, update, remove子節點時將改變Path Cache state, 同時返回所有子節點的data和state.
Curator中採用了PathChildrenCache類來處理Path Cache, 狀態的變化則採用PathChildrenCacheListener來監聽.
相關用法參見TestPathChildrenCache測試類
注意: 當zk server的資料發生變化, zk client會出現不一致, 這個需要通過版本號來識別這種狀態的變化
Test Server
用來在測試中模擬一個本地程序內ZooKeeper Server.
Test Cluster
用來在測試中模擬一個ZooKeeper Server叢集
ZKPaths工具類
提供了和ZNode相關的path處理工具方法:
- getNodeFromPath: 根據給定path獲取node name. i.e. "/one/two/three" -> "three"
- mkdirs: 根據給定路徑遞迴建立所有node
- getSortedChildren: 根據給定路徑, 返回一個按序列號排序的子節點列表
- makePath: 根據給定的path和子節點名, 建立一個完整path
EnsurePath工具類
直接看例子, 具體的說就是呼叫多次, 只會執行一次建立節點操作.
Java程式碼
- EnsurePath ensurePath = new EnsurePath(aFullPathToEnsure);
- ...
- String nodePath = aFullPathToEnsure + "/foo";
- ensurePath.ensure(zk); // first time syncs and creates if needed
- zk.create(nodePath, ...);
- ...
- ensurePath.ensure(zk); // subsequent times are NOPs
- zk.create(nodePath, ...);
Notification事件處理
Curator對ZooKeeper的事件Watcher進行了封裝處理, 然後實現了一套監聽機制. 提供了幾個監聽介面用來處理ZooKeeper連線狀態的變化
當連接出現異常, 將通過ConnectionStateListener介面進行監聽, 並進行相應的處理, 這些狀態變化包括:
- 暫停(SUSPENDED): 當連線丟失, 將暫停所有操作, 直到連線重新建立, 如果在規定時間內無法建立連線, 將觸發LOST通知
- 重連(RECONNECTED): 連線丟失, 執行重連時, 將觸發該通知
- 丟失(LOST): 連線超時時, 將觸發該通知
從com.netflix.curator.framework.imps.CuratorFrameworkImpl.validateConnection(CuratorEvent)方法中我們可以知道, Curator分別將ZooKeeper的Disconnected, Expired, SyncConnected三種狀態轉換成上面三種狀態.
參考
from :http://macrochen.iteye.com/blog/1366136