跟著例項學習ZooKeeper的用法: Curator框架應用
前面的幾篇文章介紹了一些ZooKeeper的應用方法, 本文將介紹Curator訪問ZooKeeper的一些基本方法, 而不僅僅限於指定的Recipes, 你可以使用Curator API任意的訪問ZooKeeper。
CuratorFramework
Curator框架提供了一套高階的API, 簡化了ZooKeeper的操作。 它增加了很多使用ZooKeeper開發的特性,可以處理ZooKeeper叢集複雜的連線管理和重試機制。 這些特性包括:
- 自動化的連線管理: 重新建立到ZooKeeper的連線和重試機制存在一些潛在的錯誤case。 Curator幫助你處理這些事情,對你來說是透明的。
- 清理API:
- 簡化了原生的ZooKeeper的方法,事件等
- 提供了一個現代的流式介面
- 提供了Recipes實現: 如前面的文章介紹的那樣,基於這些Recipes可以建立很多複雜的分散式應用
Curator框架通過CuratorFrameworkFactory以工廠模式和builder模式建立CuratorFramework實 例。 CuratorFramework例項都是執行緒安全的,你應該在你的應用中共享同一個CuratorFramework例項.
工廠方法newClient()提供了一個簡單方式建立例項。 而Builder提供了更多的引數控制。一旦你建立了一個CuratorFramework例項,你必須呼叫它的start()啟動,在應用退出時呼叫close()方法關閉.
下面的例子演示了兩種建立Curator的方法:
package com.colobu.zkrecipe.framework;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class CreateClientExample {
private static final String PATH = "/example/basic";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
try {
client = createSimple(server.getConnectString());
client.start();
client.create().creatingParentsIfNeeded().forPath(PATH, "test".getBytes());
CloseableUtils.closeQuietly(client);
client = createWithOptions(server.getConnectString(), new ExponentialBackoffRetry(1000, 3), 1000, 1000);
client.start();
System.out.println(new String(client.getData().forPath(PATH)));
} catch (Exception ex) {
ex.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
public static CuratorFramework createSimple(String connectionString) {
// these are reasonable arguments for the ExponentialBackoffRetry.
// The first retry will wait 1 second - the second will wait up to 2 seconds - the
// third will wait up to 4 seconds.
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// The simplest way to get a CuratorFramework instance. This will use default values.
// The only required arguments are the connection string and the retry policy
return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
}
public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
// using the CuratorFrameworkFactory.builder() gives fine grained control
// over creation options. See the CuratorFrameworkFactory.Builder javadoc details
return CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc. etc.
.build();
}
}
Curator框架提供了一種流式介面。 操作通過builder串聯起來, 這樣方法呼叫類似語句一樣。
client.create().forPath("/head", new byte[0]);
client.delete().inBackground().forPath("/head");
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);
client.getData().watched().inBackground().forPath("/test");
CuratorFramework提供的方法:
方法名 | 描述 |
---|---|
create() | 開始建立操作, 可以呼叫額外的方法(比如方式mode 或者後臺執行background) 並在最後呼叫forPath()指定要操作的ZNode |
delete() | 開始刪除操作. 可以呼叫額外的方法(版本或者後臺處理version or background)並在最後呼叫forPath()指定要操作的ZNode |
checkExists() | 開始檢查ZNode是否存在的操作. 可以呼叫額外的方法(監控或者後臺處理)並在最後呼叫forPath()指定要操作的ZNode |
getData() | 開始獲得ZNode節點資料的操作. 可以呼叫額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後呼叫forPath()指定要操作的ZNode |
setData() | 開始設定ZNode節點資料的操作. 可以呼叫額外的方法(版本或者後臺處理) 並在最後呼叫forPath()指定要操作的ZNode |
getChildren() | 開始獲得ZNode的子節點列表。 以呼叫額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後呼叫forPath()指定要操作的ZNode |
inTransaction() | 開始是原子ZooKeeper事務. 可以複合create, setData, check, and/or delete 等操作然後呼叫commit()作為一個原子操作提交 |
後臺操作的通知和監控可以通過ClientListener介面釋出. 你可以在CuratorFramework例項上通過addListener()
註冊listener, Listener實現了下面的方法:
- eventReceived() 一個後臺操作完成或者一個監控被觸發
事件型別以及事件的方法如下:
Event Type | Event Methods |
---|---|
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GETDATA | getResultCode(), getPath(), getStat() and getData() |
SETDATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
WATCHED | getWatchedEvent() |
還可以通過ConnectionStateListener
介面監控連線的狀態。 強烈推薦你增加這個監控器。
你可以使用名稱空間Namespace避免多個應用的節點的名稱衝突。 CuratorFramework提供了名稱空間的概念,這樣CuratorFramework會為它的API呼叫的path加上名稱空間:
CuratorFramework client = CuratorFrameworkFactory.builder().namespace("MyApp") ... build();
...
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"
建立builder時不是呼叫build()
而是呼叫buildTemp()
。 3分鐘不活動連線就被關閉,你也可以指定不活動的時間。 它只提供了下面幾個方法:
public void close();
public CuratorTransaction inTransaction() throws Exception;
public TempGetDataBuilder getData() throws Exception;
操作方法
上面的表格列出了CuratorFramework可以用的操作。 下面就是一個例子:
package com.colobu.zkrecipe.framework;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
public class CrudExample {
public static void main(String[] args) {
}
public static void create(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given ZNode with the given data
client.create().forPath(path, payload);
}
public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given EPHEMERAL ZNode with the given data
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}
public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) throws Exception {
// this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given
// data using Curator protection.
return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);
}
public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception {
// set data for the given node
client.setData().forPath(path, payload);
}
public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception {
// this is one method of getting event/async notifications
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
// examine event for details
}
};
client.getCuratorListenable().addListener(listener);
// set data for the given node asynchronously. The completion
// notification
// is done via the CuratorListener.
client.setData().inBackground().forPath(path, payload);
}
public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
// this is another method of getting notification of an async completion
client.setData().inBackground(callback).forPath(path, payload);
}
public static void delete(CuratorFramework client, String path) throws Exception {
// delete the given node
client.delete().forPath(path);
}
public static void guaranteedDelete(CuratorFramework client, String path) throws Exception {
// delete the given node and guarantee that it completes
client.delete().guaranteed().forPath(path);
}
public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception {
/**
* Get children and set a watcher on the node. The watcher notification
* will come through the CuratorListener (see setDataAsync() above).
*/
return client.getChildren().watched().forPath(path);
}
public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher) throws Exception {
/**
* Get children and set the given watcher on the node.
*/
return client.getChildren().usingWatcher(watcher).forPath(path);
}
}
事務
上面也提到, CuratorFramework提供了事務的概念,可以將一組操作放在一個原子事務中。 什麼叫事務? 事務是原子的, 一組操作要麼都成功,要麼都失敗。
下面的例子演示了事務的操作:
package com.colobu.zkrecipe.framework;
import java.util.Collection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
public class TransactionExample {
public static void main(String[] args) {
}
public static Collection<CuratorTransactionResult> transaction(CuratorFramework client) throws Exception {
// this example shows how to use ZooKeeper's new transactions
Collection<CuratorTransactionResult> results = client.inTransaction().create().forPath("/a/path", "some data".getBytes())
.and().setData().forPath("/another/path", "other data".getBytes())
.and().delete().forPath("/yet/another/path")
.and().commit(); // IMPORTANT!
// called
for (CuratorTransactionResult result : results) {
System.out.println(result.getForPath() + " - " + result.getType());
}
return results;
}
/*
* These next four methods show how to use Curator's transaction APIs in a
* more traditional - one-at-a-time - manner
*/
public static CuratorTransaction startTransaction(CuratorFramework client) {
// start the transaction builder
return client.inTransaction();
}
public static CuratorTransactionFinal addCreateToTransaction(CuratorTransaction transaction) throws Exception {
// add a create operation
return transaction.create().forPath("/a/path", "some data".getBytes()).and();
}
public static CuratorTransactionFinal addDeleteToTransaction(CuratorTransaction transaction) throws Exception {
// add a delete operation
return transaction.delete().forPath("/another/path").and();
}
public static void commitTransaction(CuratorTransactionFinal transaction) throws Exception {
// commit the transaction
transaction.commit();
}
}