Springboot2(29)整合zookeeper的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器
阿新 • • 發佈:2018-12-29
實現zookeeper節點的增刪改查、節點監聽、分散式讀寫鎖、分散式計數器
新增依賴
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <zookeeper.version>3.4.8</zookeeper.version> <curator.version>2.11.1</curator.version> </properties> <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> </dependencies>
ZkClient(curator)
這裡使用的是curator,curator是對zookeeper的簡單封裝,提供了一些整合的方法,或者是提供了更優雅的api
/**
* zookeeper客戶端
*/
@Data
@Slf4j
public class ZkClient {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private CuratorFramework client;
public TreeCache cache;
private ZookeeperProperties zookeeperProperties;
public ZkClient(ZookeeperProperties zookeeperProperties){
this.zookeeperProperties = zookeeperProperties;
}
/**
* 初始化zookeeper客戶端
*/
public void init() {
try{
RetryPolicy retryPolicy = new ExponentialBackoffRetry( zookeeperProperties.getBaseSleepTimeMs(),
zookeeperProperties.getMaxRetries());
Builder builder = CuratorFrameworkFactory.builder()
.connectString(zookeeperProperties.getServer()).retryPolicy(retryPolicy)
.sessionTimeoutMs( zookeeperProperties.getSessionTimeoutMs())
.connectionTimeoutMs( zookeeperProperties.getConnectionTimeoutMs())
.namespace( zookeeperProperties.getNamespace());
if(StringUtils.isNotEmpty( zookeeperProperties.getDigest())){
builder.authorization("digest", zookeeperProperties.getDigest().getBytes("UTF-8"));
builder.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
initLocalCache("/test");
// addConnectionStateListener();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
//連線丟失
logger.info("lost session with zookeeper");
} else if (state == ConnectionState.CONNECTED) {
//連線新建
logger.info("connected with zookeeper");
} else if (state == ConnectionState.RECONNECTED) {
logger.info("reconnected with zookeeper");
}
}
});
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 初始化本地快取
* @param watchRootPath
* @throws Exception
*/
private void initLocalCache(String watchRootPath) throws Exception {
cache = new TreeCache(client, watchRootPath);
TreeCacheListener listener = (client1, event) ->{
log.info("event:" + event.getType() +
" |path:" + (null != event.getData() ? event.getData().getPath() : null));
if(event.getData()!=null && event.getData().getData()!=null){
log.info("發生變化的節點內容為:" + new String(event.getData().getData()));
}
// client1.getData().
};
cache.getListenable().addListener(listener);
cache.start();
}
public void stop() {
client.close();
}
public CuratorFramework getClient() {
return client;
}
/**
* 建立節點
* @param mode 節點型別
* 1、PERSISTENT 持久化目錄節點,儲存的資料不會丟失。
* 2、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失
* 3、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除
*4、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名。
* @param path 節點名稱
* @param nodeData 節點資料
*/
public void createNode(CreateMode mode, String path , String nodeData) {
try {
//使用creatingParentContainersIfNeeded()之後Curator能夠自動遞迴建立所有所需的父節點
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));
} catch (Exception e) {
logger.error("註冊出錯", e);
}
}
/**
* 建立節點
* @param mode 節點型別
* 1、PERSISTENT 持久化目錄節點,儲存的資料不會丟失。
* 2、PERSISTENT_SEQUENTIAL順序自動編號的持久化目錄節點,儲存的資料不會丟失
* 3、EPHEMERAL臨時目錄節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除
* 4、EPHEMERAL_SEQUENTIAL臨時自動編號節點,一旦建立這個節點的客戶端與伺服器埠也就是session 超時,這種節點會被自動刪除,並且根據當前已近存在的節點數自動加 1,然後返回給客戶端已經成功建立的目錄節點名。
* @param path 節點名稱
*/
public void createNode(CreateMode mode,String path ) {
try {
//使用creatingParentContainersIfNeeded()之後Curator能夠自動遞迴建立所有所需的父節點
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
logger.error("註冊出錯", e);
}
}
/**
* 刪除節點資料
*
* @param path
*/
public void deleteNode(final String path) {
try {
deleteNode(path,true);
} catch (Exception ex) {
log.error("{}",ex);
}
}
/**
* 刪除節點資料
* @param path
* @param deleteChildre 是否刪除子節點
*/
public void deleteNode(final String path,Boolean deleteChildre){
try {
if(deleteChildre){
//guaranteed()刪除一個節點,強制保證刪除,
// 只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操作,直到刪除節點成功
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}else{
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 設定指定節點的資料
* @param path
* @param datas
*/
public void setNodeData(String path, byte[] datas){
try {
client.setData().forPath(path, datas);
}catch (Exception ex) {
log.error("{}",ex);
}
}
/**
* 獲取指定節點的資料
* @param path
* @return
*/
public byte[] getNodeData(String path){
Byte[] bytes = null;
try {
if(cache != null){
ChildData data = cache.getCurrentData(path);
if(data != null){
return data.getData();
}
}
client.getData().forPath(path);
return client.getData().forPath(path);
}catch (Exception ex) {
log.error("{}",ex);
}
return null;
}
/**
* 獲取資料時先同步
* @param path
* @return
*/
public byte[] synNodeData(String path){
client.sync();
return getNodeData( path);
}
/**
* 判斷路徑是否存在
*
* @param path
* @return
*/
public boolean isExistNode(final String path) {
client.sync();
try {
return null != client.checkExists().forPath(path);
} catch (Exception ex) {
return false;
}
}
/**
* 獲取節點的子節點
* @param path
* @return
*/
public List<String> getChildren(String path) {
List<String> childrenList = new ArrayList<>();
try {
childrenList = client.getChildren().forPath(path);
} catch (Exception e) {
logger.error("獲取子節點出錯", e);
}
return childrenList;
}
/**
* 隨機讀取一個path子路徑, "/"為根節點對應該namespace
* 先從cache中讀取,如果沒有,再從zookeeper中查詢
* @param path
* @return
* @throws Exception
*/
public String getRandomData(String path) {
try{
Map<String,ChildData> cacheMap = cache.getCurrentChildren(path);
if(cacheMap != null && cacheMap.size() > 0) {
logger.debug("get random value from cache,path="+path);
Collection<ChildData> values = cacheMap.values();
List<ChildData> list = new ArrayList<>(values);
Random rand = new Random();
byte[] b = list.get(rand.nextInt(list.size())).getData();
return new String(b,"utf-8");
}
if(isExistNode(path)) {
logger.debug("path [{}] is not exists,return null",path);
return null;
} else {
logger.debug("read random from zookeeper,path="+path);
List<String> list = client.getChildren().forPath(path);
if(list == null || list.size() == 0) {
logger.debug("path [{}] has no children return null",path);
return null;
}
Random rand = new Random();
String child = list.get(rand.nextInt(list.size()));
path = path + "/" + child;
byte[] b = client.getData().forPath(path);
String value = new String(b,"utf-8");
return value;
}
}catch(Exception e){
log.error("{}",e);
}
return null;
}
/**
* 可重入共享鎖 -- Shared Reentrant Lock
* @param lockPath
* @param time
* @param dealWork 獲取
* @return
*/
public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
if (!lock.acquire(time, TimeUnit.SECONDS)) {
log.error("get lock fail:{}", " could not acquire the lock");
return null;
}
log.debug("{} get the lock",lockPath);
Object b = dealWork.deal();
return b;
}catch(Exception e){
log.error("{}", e);
}finally{
try {
lock.release();
} catch (Exception e) {
//log.error("{}",e);
}
}
return null;
}
/**
* 獲取讀寫鎖
* @param path
* @return
*/
public InterProcessReadWriteLock getReadWriteLock(String path){
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock;
}
/**
* 在註冊監聽器的時候,如果傳入此引數,當事件觸發時,邏輯由執行緒池處理
*/
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 監聽資料節點的變化情況
* @param watchPath
* @param listener
*/
public void watchPath(String watchPath,TreeCacheListener listener){
// NodeCache nodeCache = new NodeCache(client, watchPath, false);
TreeCache cache = new TreeCache(client, watchPath);
cache.getListenable().addListener(listener,pool);
try {
cache.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
配置檔案
zookeeper.enabled: true
#zookeeper.server: 47.106.106.53:9036,47.106.106.53:9037,47.106.106.53:9038
zookeeper.server: 10.10.2.137:2181,10.10.2.138:2181,10.10.2.139:2181
zookeeper.namespace: demo
zookeeper.digest: rt:rt #zkCli.sh acl 命令 addauth digest mpush
zookeeper.sessionTimeoutMs: 1000 #會話超時時間,單位為毫秒,預設60000ms,連線斷開後,其它客戶端還能請到臨時節點的時間
zookeeper.connectionTimeoutMs: 6000 #連線建立超時時間,單位為毫秒
zookeeper.maxRetries: 3 #最大重試次數
zookeeper.baseSleepTimeMs: 1000 #初始sleep時間 ,毫秒
程式會建立節點demo為namespace,之後所有增刪改查的操作都這節點下完成
Controller層方法
@Api(tags="zookeeper基本操作")
@RequestMapping("/zk")
@RestController
@Slf4j
public class ZookeeperController {
@Autowired
private ZkClient zkClient;
@Autowired
private ZkClient zkClientTest;
/**
* 建立節點
* @param type
* @param znode
* @return
*/
@ApiOperation(value = "建立節點",notes = "在名稱空間下建立節點")
@ApiImplicitParams({
@ApiImplicitParam(name ="type",value = "節點型別:<br> 0 持久化節點<br> 1 臨時節點<br> 2 持久順序節點<br> 3 臨時順序節點",
allowableValues = "0,1,2,3",defaultValue="3",paramType = "path",required = true,dataType = "Long"),
@ApiImplicitParam(name ="znode",value = "節點名稱",paramType = "path",required = true,dataType = "String"),
@ApiImplicitParam(name ="nodeData",value = "節點資料",paramType = "body",dataType = "String")
})
@RequestMapping(value = "/create/{type}/{znode}",method=RequestMethod.POST)
private String create(@PathVariable Integer type,@PathVariable String znode,@RequestBody String nodeData){
znode = "/" + znode;
try {
zkClient.createNode(CreateMode.fromFlag(type),znode,nodeData);
} catch (Kee