Zookeeper客戶端Curator使用指南
what is Curator ?
Curator是zookeeper分散式協調服務的java客戶端庫,它包裝了一系列操作zk的高階API和實用庫,是的操作zk變得更加容易和可靠。例如使用原生zk的API實現分散式鎖的話,程式碼量多,複雜,使用Curator後就相對簡單的多,很多底層的api都直接封裝好了,開箱即用,學習成本低。
Getting Started
1、使用Curator之前,你需要引入maven依賴
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency>
2、例項化Curator,你可以通過CuratorFrameworkFactory類提供的來產生一個CuratorFramework物件
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
zookeeperConnectionString就是連線的ip:埠資訊,retryPolicy是重試策略,Curator提供了三種常用的重試策略,這裡不詳述
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
或者你也可以使用鏈式呼叫來例項化curator
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(zookeeperConnectionString)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
How to use Curator API operate ZK ?
curator操作zk的api主要包括 節點的增刪改查、節點判斷、節點監聽等,下面的程式碼演示瞭如何使用基本的curator api
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 類描述:zookeeper客戶端curator使用demo
*/
public class CuratorDemo {
private static final Logger logger = LoggerFactory.getLogger(CuratorDemo.class);
private static final String NODE_PATH = "/node_8";
private static final String CONNECT_TOSTRING = "10.200.121.46:2181";
/*建立執行緒池,供給非同步使用curator時呼叫*/
public static ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
try {
/*重試策略一:重試三次,每重試一次,重試的間隔時間會越來越大
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
*/
/*重試策略二:最多重試三次,每次重試間隔1s
RetryPolicy retryPolicy1 = new RetryNTimes(3,1000);
*/
/*重試策略三:最大重試時間總和不超過5s,每次重試間隔為1s*/
RetryPolicy retryPolicy2 = new RetryUntilElapsed(5000, 1000);
/*
/* 方式一建立zookeeper連線
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_TOSTRING,5000,5000,retryPolicy2);
*/
/*方式二建立zookeeper連線*/
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECT_TOSTRING)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy2)
.build();
curatorFramework.start();
/*建立節點資料*/
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(NODE_PATH, "456".getBytes());
/*刪除節點(包含子節點)*/ curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath(NODE_PATH);
/*獲取子節點*/
List<String> strings = curatorFramework.getChildren().forPath(NODE_PATH);
/*獲取節點資料內容*/
byte[] bytes = curatorFramework.getData().forPath(NODE_PATH);
System.out.println(new String(bytes));
/*獲取節點資料內容+狀態資訊*/
Stat stat = new Stat();
byte[] result = curatorFramework.getData().storingStatIn(stat).forPath(NODE_PATH);
System.out.println(new String(result));
/*修改節點資料內容*/
curatorFramework.setData().forPath(NODE_PATH, "123".getBytes());
/*判斷節點是否存在*/
Stat stat1 = curatorFramework.checkExists().forPath(NODE_PATH);
/*非同步操作,以判斷節點是否存在為例,注意使用執行緒池以便節省單個執行緒的建立銷燬開銷,及最後執行緒的關閉*/
curatorFramework.checkExists().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
Object context = curatorEvent.getContext(); //這裡的上下文就是 傳遞進去的"123456"
}
}, "12345", executorService).forPath(NODE_PATH);
/*設定節點事件監聽*/
final NodeCache nodeCache = new NodeCache(curatorFramework, NODE_PATH);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] result = nodeCache.getCurrentData().getData();
logger.info("事件監聽result=" + new String(result));
}
});
/*設定子節點事件監聽*/
final PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, NODE_PATH, true);
childrenCache.start();
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
switch (type) {
case CHILD_ADDED:
logger.info("");
case CHILD_UPDATED:
logger.info("");
case CHILD_REMOVED:
logger.info("");
default:
break;
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}
使用Curator實現Master選舉
Curator中包裝的master選舉包含兩種,Leader Latch和Leader Election,原理採用的是zk的節點特性,即多個客戶端同時建立同一節點,zk保證只有一個客戶端能建立成功,成功的客戶端即為master節點,再master節點的機器上執行業務。Leader Latcher裡面即包裝了zk建立節點、設定監聽,對應的操作均包裝在LeaderLatch類中。LeaderLatch將隨機選出一個master
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 類描述:基於Curator的Leader Latch實現的master選舉
* 建立人:simonsfan
*/
@Component
public class CuratorLeaderLatch {
private static CuratorFramework curatorFramework;
private static LeaderLatch leaderLatch;
private static final String path = "/root/leaderlatch";
private static final String connectStr = "10.200.121.46:2181,10.200.121.159:2181,10.200.121.168:2181";
static {
curatorFramework = CuratorFrameworkFactory
.builder()
.connectString(connectStr)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectionTimeoutMs(5000)
.sessionTimeoutMs(5000)
.build();
curatorFramework.start();
leaderLatch = new LeaderLatch(curatorFramework, path.concat("/testtast"));
}
@Lazy
@Scheduled(cron = "")
public void testTask() {
//是master節點的執行業務流程,使用leaderLatch.hasLeadership()方法判斷是否為leader,true表示是master節點
if (!leaderLatch.hasLeadership()) return;
//TODO something
}
}
使用Curator實現分散式鎖
這裡講的的是Shared Reentrant Lock(共享可重入鎖,推薦使用,Curator還封裝了其他型別的鎖:共享不可重入鎖之類的):全域性同步的、公平的分散式共享重入式鎖,可保證在任意同一時刻,只有一個客戶端持有鎖。使用到的類是InterProcessMutex
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 類描述:Curator實現的分散式鎖
* 建立人:simonsfan
*/
public class DistributedLock {
private static CuratorFramework curatorFramework;
private static InterProcessMutex interProcessMutex;
private static final String connectString = "10.200.121.46:2181,10.200.121.43:2181,10.200.121.167:2181";
private static final String root = "/root";
private static ExecutorService executorService;
private String lockName;
public String getLockName() {
return lockName;
}
public void setLockName(String lockName) {
this.lockName = lockName;
}
static {
curatorFramework = CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
executorService = Executors.newCachedThreadPool();
curatorFramework.start();
}
public DistributedLock(String lockName) {
this.lockName = lockName;
interProcessMutex = new InterProcessMutex(curatorFramework, root.concat(lockName));
}
/*上鎖*/
public void tryLock() {
int count = 0;
try {
while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
count++;
if (count > 3) {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*釋放*/
public void releaseLock() {
try {
if (interProcessMutex != null) {
interProcessMutex.release();
}
curatorFramework.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
}
}, executorService).forPath(root.concat(lockName));
} catch (Exception e) {
e.printStackTrace();
}
}
}