zookeeper使用(三)--開源客戶端
一、前言
上一篇部落格已經介紹瞭如何使用Zookeeper提供的原生態Java API進行操作,本篇博文主要講解如何通過開源客戶端來進行操作。
二、ZkClient
ZkClient是在Zookeeper原聲API介面之上進行了包裝,是一個更易用的Zookeeper客戶端,其內部還實現了諸如Session超時重連、Watcher反覆註冊等功能。
2.1 新增依賴
在pom.xml檔案中新增如下內容即可。
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
2.2 建立會話
使用ZkClient可以輕鬆的建立會話,連線到服務端。
package com.hust.grid.leesf.zkclient.examples; import java.io.IOException; import org.I0Itec.zkclient.ZkClient; public class Create_Session_Sample { public static void main(String[] args) throws IOException, InterruptedException { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); System.out.println("ZooKeeper session established."); } }
執行結果:
ZooKeeper session established.
結果表明已經成功建立會話。
2.3 建立節點
ZkClient提供了遞迴建立節點的介面,即其幫助開發者完成父節點的建立,再建立子節點。
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Create_Node_Sample { public static void main(String[] args) throws Exception { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); String path = "/zk-book/c1"; zkClient.createPersistent(path, true); System.out.println("success create znode."); } }
執行結果:
success create znode.
結果表明已經成功建立了節點,值得注意的是,在原生態介面中是無法建立成功的(父節點不存在),但是通過ZkClient可以遞迴的先建立父節點,再建立子節點。
可以看到確實成功建立了/zk-book和/zk-book/c1兩個節點。
2.4 刪除節點
ZkClient提供了遞迴刪除節點的介面,即其幫助開發者先刪除所有子節點(存在),再刪除父節點。
package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.createPersistent(path, "");
zkClient.createPersistent(path+"/c1", "");
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
}
}
執行結果:
success delete znode.
結果表明ZkClient可直接刪除帶子節點的父節點,因為其底層先刪除其所有子節點,然後再刪除父節點。
2.5 獲取子節點
package com.hust.grid.leesf.zkclient.examples;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Children_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
}
});
zkClient.createPersistent(path);
Thread.sleep(1000);
zkClient.createPersistent(path + "/c1");
Thread.sleep(1000);
zkClient.delete(path + "/c1");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
結果表明:
客戶端可以對一個不存在的節點進行子節點變更的監聽。
一旦客戶端對一個節點註冊了子節點列表變更監聽之後,那麼當該節點的子節點列表發生變更時,服務端都會通知客戶端,並將最新的子節點列表傳送給客戶端
該節點本身的建立或刪除也會通知到客戶端。
2.6 獲取資料
package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.createEphemeral(path, "123");
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Node " + dataPath + " deleted.");
}
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("Node " + dataPath + " changed, new data: " + data);
}
});
System.out.println(zkClient.readData(path));
zkClient.writeData(path, "456");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
123
Node /zk-book changed, new data: 456
Node /zk-book deleted.
結果表明可以成功監聽節點資料變化或刪除事件。
2.7 檢測節點是否存在
package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;
public class Exist_Node_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);
System.out.println("Node " + path + " exists " + zkClient.exists(path));
}
}
執行結果:
Node /zk-book exists false
結果表明,可以通過ZkClient輕易檢測節點是否存在,其相比於原生態的介面更易於理解。
三、Curator客戶端
Curator解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連線重連,反覆註冊Watcher和NodeExistsException異常等,現已成為Apache的頂級專案。
3.1 新增依賴
在pom.xml檔案中新增如下內容即可。
<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
3.2 建立會話
Curator除了使用一般方法建立會話外,還可以使用fluent風格進行建立。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Create_Session_Sample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
client.start();
System.out.println("Zookeeper session1 established. ");
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
client1.start();
System.out.println("Zookeeper session2 established. ");
}
}
執行結果:
Zookeeper session1 established.
Zookeeper session2 established.
值得注意的是session2會話含有隔離名稱空間,即客戶端對Zookeeper上資料節點的任何操作都是相對/base目錄進行的,這有利於實現不同的Zookeeper的業務之間的隔離。
3.3 建立節點
通過使用Fluent風格的介面,開發人員可以進行自由組合來完成各種型別節點的建立。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
System.out.println("success create znode: " + path);
}
}
執行結果:
success create znode: /zk-book/c1
其中,也建立了/zk-book/c1的父節點/zk-book節點。
3.4 刪除節點
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
System.out.println("success delete znode " + path);
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
init
success delete znode /zk-book/c1
結果表明成功刪除/zk-book/c1節點。
3.5 獲取資料
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Get_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
}
}
執行結果:
init
結果表明成功獲取了節點的資料。
3.6 更新資料
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Set_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "
+ client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
try {
client.setData().withVersion(stat.getVersion()).forPath(path);
} catch (Exception e) {
System.out.println("Fail set node due to " + e.getMessage());
}
}
}
執行結果:
Success set node for : /zk-book, new version: 1
Fail set node due to KeeperErrorCode = BadVersion for /zk-book
結果表明當攜帶資料版本不一致時,無法完成更新操作。
3.7 非同步介面
如同Zookeeper原生API提供了非同步介面,Curator也提供了非同步介面。在Zookeeper中,所有的非同步通知事件處理都是由EventThread這個執行緒來處理的,EventThread執行緒用於序列處理所有的事件通知,其可以保證對事件處理的順序性,但是一旦碰上覆雜的處理單元,會消耗過長的處理時間,從而影響其他事件的處理,Curator允許使用者傳入Executor例項,這樣可以將比較複雜的事件處理放到一個專門的執行緒池中去。
package com.hust.grid.leesf.curator.examples;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Background_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
static CountDownLatch semaphore = new CountDownLatch(2);
static ExecutorService tp = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
System.out.println();
semaphore.countDown();
}
}, tp).forPath(path, "init".getBytes());
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}).forPath(path, "init".getBytes());
semaphore.await();
tp.shutdown();
}
}
執行結果:
Main thread: main
event[code: -110, type: CREATE], Thread of processResult: main-EventThread
event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1
其中,建立節點的事件由執行緒池自己處理,而非預設執行緒處理。
Curator除了提供很便利的API,還提供了一些典型的應用場景,開發人員可以使用參考更好的理解如何使用Zookeeper客戶端,所有的都在recipes包中,只需要在pom.xml中新增如下依賴即可
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
3.8 節點監聽
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class NodeCache_Sample {
static String path = "/zk-book/nodecache";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
}
});
client.setData().forPath(path, "u".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
Node data update, new data: u
當節點資料變更後收到了通知。NodeCache不僅可以監聽資料節點的內容變更,也能監聽指定節點是否存在。
3.9 子節點監聽
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class PathChildrenCache_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();
public static void main(String[] args) throws Exception {
client.start();
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
Thread.sleep(1000);
client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
client.delete().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1
監聽節點的子節點,包括新增、資料變化、刪除三類事件。
3.10 Master選舉
藉助Zookeeper,開發者可以很方便地實現Master選舉功能,其大體思路如下:選擇一個根節點,如/master_select,多臺機器同時向該節點建立一個子節點/master_select/lock,利用Zookeeper特性,最終只有一臺機器能夠成功建立,成功的那臺機器就是Master。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_MasterSelect {
static String master_path = "/curator_recipes_master_path";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("成為Master角色");
Thread.sleep(3000);
System.out.println("完成Master操作,釋放Master權利");
}
});
selector.autoRequeue();
selector.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
執行結果:
成為Master角色
完成Master操作,釋放Master權利
成為Master角色
以上結果會反覆迴圈,並且當一個應用程式完成Master邏輯後,另外一個應用程式的相應方法才會被呼叫,即當一個應用例項成為Master後,其他應用例項會進入等待,直到當前Master掛了或者推出後才會開始選舉Master。
3.11 分散式鎖
為了保證資料的一致性,經常在程式的某個執行點需要進行同步控制。以流水號生成場景為例,普通的後臺應用通常採用時間戳方式來生成流水號,但是在使用者量非常大的情況下,可能會出現併發問題。
package com.hust.grid.leesf.curator.examples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class Recipes_NoLock {
public static void main(String[] args) throws Exception {
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
try {
down.await();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.err.println("生成的訂單號是 : " + orderNo);
}
}).start();
}
down.countDown();
}
}
執行結果:
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|592
生成的訂單號是 : 16:29:10|591
結果表示訂單號出現了重複,即普通的方法無法滿足業務需要,因為其未進行正確的同步。可以使用Curator來實現分散式鎖功能。
package com.hust.grid.leesf.curator.examples;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_Lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
public void run() {
try {
down.await();
lock.acquire();
} catch (Exception e) {
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的訂單號是 : " + orderNo);
try {
lock.release();
} catch (Exception e) {
}
}
}).start();
}
down.countDown();
}
}
執行結果:
生成的訂單號是 : 16:31:50|293
生成的訂單號是 : 16:31:50|319
生成的訂單號是 : 16:31:51|278
生成的訂單號是 : 16:31:51|326
生成的訂單號是 : 16:31:51|402
生成的訂單號是 : 16:31:51|420
生成的訂單號是 : 16:31:51|546
生成的訂單號是 : 16:31:51|602
生成的訂單號是 : 16:31:51|626
生成的訂單號是 : 16:31:51|656
生成的訂單號是 : 16:31:51|675
生成的訂單號是 : 16:31:51|701
生成的訂單號是 : 16:31:51|708
生成的訂單號是 : 16:31:51|732
生成的訂單號是 : 16:31:51|763
生成的訂單號是 : 16:31:51|785
生成的訂單號是 : 16:31:51|805
生成的訂單號是 : 16:31:51|823
生成的訂單號是 : 16:31:51|839
生成的訂單號是 : 16:31:51|853
生成的訂單號是 : 16:31:51|868
生成的訂單號是 : 16:31:51|884
生成的訂單號是 : 16:31:51|897
生成的訂單號是 : 16:31:51|910
生成的訂單號是 : 16:31:51|926
生成的訂單號是 : 16:31:51|939
生成的訂單號是 : 16:31:51|951
生成的訂單號是 : 16:31:51|965
生成的訂單號是 : 16:31:51|972
生成的訂單號是 : 16:31:51|983
結果表明此時已經不存在重複的流水號。
3.12 分散式計數器
分散式計數器的典型應用是統計系統的線上人數,藉助Zookeeper也可以很方便實現分散式計數器功能:指定一個Zookeeper資料節點作為計數器,多個應用例項在分散式鎖的控制下,通過更新節點的內容來實現計數功能。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
public class Recipes_DistAtomicInt {
static String distatomicint_path = "/curator_recipes_distatomicint_path";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,
new RetryNTimes(3, 1000));
AtomicValue<Integer> rc = atomicInteger.add(8);
System.out.println("Result: " + rc.succeeded());
}
}
執行結果:
Result: true
結果表明已經將資料成功寫入資料節點中。
3.13 分散式Barrier
如同JDK的CyclicBarrier,Curator提供了DistributedBarrier來實現分散式Barrier。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_Barrier {
static String barrier_path = "/curator_recipes_barrier_path";
static DistributedBarrier barrier;
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
public void run() {
try {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
barrier = new DistributedBarrier(client, barrier_path);
System.out.println(Thread.currentThread().getName() + "號barrier設定");
barrier.setBarrier();
barrier.waitOnBarrier();
System.err.println("啟動...");
} catch (Exception e) {
}
}
}).start();
}
Thread.sleep(2000);
barrier.removeBarrier();
}
}
執行結果:
Thread-1號barrier設定
Thread-2號barrier設定
Thread-4號barrier設定
Thread-3號barrier設定
Thread-0號barrier設定
啟動...
啟動...
啟動...
啟動...
啟動...
結果表明通過DistributedBarrier可以實現類似於CyclicBarrier的分散式Barrier功能。
四、Curator工具類
4.1 ZKPaths
其提供了簡單的API來構建znode路徑、遞迴建立、刪除節點等。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;
public class ZKPaths_Sample {
static String path = "/curator_zkpath_sample";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
System.out.println(ZKPaths.fixForNamespace(path, "sub"));
System.out.println(ZKPaths.makePath(path, "sub"));
System.out.println(ZKPaths.getNodeFromPath("/curator_zkpath_sample/sub1"));
PathAndNode pn = ZKPaths.getPathAndNode("/curator_zkpath_sample/sub1");
System.out.println(pn.getPath());
System.out.println(pn.getNode());
String dir1 = path + "/child1";
String dir2 = path + "/child2";
ZKPaths.mkdirs(zookeeper, dir1);
ZKPaths.mkdirs(zookeeper, dir2);
System.out.println(ZKPaths.getSortedChildren(zookeeper, path));
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}
}
執行結果:
/curator_zkpath_sample/sub
/curator_zkpath_sample/sub
sub1
/curator_zkpath_sample
sub1
[child1, child2]
藉助ZKPaths可快速方便的完成節點的建立等操作。
4.2 EnsurePath
其提供了一種能夠確保資料節點存在的機制,當上層業務希望對一個數據節點進行操作時,操作前需要確保該節點存在。
package com.hust.grid.leesf.curator.examples;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;
public class EnsurePathDemo {
static String path = "/zk-book/c1";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
client.usingNamespace("zk-book");
EnsurePath ensurePath = new EnsurePath(path);
ensurePath.ensure(client.getZookeeperClient());
ensurePath.ensure(client.getZookeeperClient());
EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
ensurePath2.ensure(client.getZookeeperClient());
}
}
EnsurePath採取瞭如下節點建立方式,試圖建立指定節點,如果節點已經存在,那麼就不進行任何操作,也不對外丟擲異常,否則正常建立資料節點。
五、總結
本篇介紹了使用Zookeeper的開源客戶端如何操作Zookeeper的方法,相應的原始碼也已經上傳至github,謝謝各位園友的觀看~