zookeeper實現系統註冊和系統發現
阿新 • • 發佈:2018-12-02
需求:多系統整合,需要登陸任意系統後都可以顯示業務系統名稱,並得到對應地址資訊等。
實現思路:使用zookeeper作為系統註冊,每個系統啟動的時候進行註冊系統資訊,臨時有序為註冊型別,並且註冊事件監聽,並獲取所 有子節點的系統資訊,新增至靜態變數,考慮到部分系統可能會進行叢集部署需要進行系統資訊的去重.
環境以及工具包
zookeeper3.4.10,系統後端均使用spring框架,zkclient工具包
1.引入pom
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
2.編寫java程式碼
RegisterServer啟動註冊類和SystemRegisterInfo資訊儲存類以及ZKChildListener事件監聽實現
package fsl.lcp.utils.zk; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import fsl.lcp.constant.RentConstant; /** * 初始化 * @author Jun * */ public class RegisterServer { private Logger logger = LoggerFactory.getLogger(fsl.lcp.utils.zk.RegisterServer.class); public static volatile List<SystemRegisterInfo> systemList = null; public ZkClient zkClient=null; @Value("${zookeeper.connectString}") private String connectString;//zookeeper地址 @Value("${zookeeper.nodeName}") private String nodeName;//節點 @Value("${zookeeper.localIp}") private String localIp;//本系統ip @Value("${zookeeper.localPort}") private int localPort;//本系統埠 /** * 初始化 */ public void init(){ try { zkClient = new ZkClient(connectString, 5000); logger.debug("====zookeeper====connect===is==ok=========="); SystemRegisterInfo systemRegisterInfo = new SystemRegisterInfo(); systemRegisterInfo.setIp(localIp); systemRegisterInfo.setPort(localPort); systemRegisterInfo.setName(RentConstant.SYSTEMNAME_LOCAL); systemRegisterInfo.setNextNode(RentConstant.SYSTEMCODE_LOCAL); zkClient.createPersistent(nodeName, true); //建立節點-臨時有序 String path = zkClient.create(nodeName +"/"+systemRegisterInfo.getNextNode(), systemRegisterInfo, CreateMode.EPHEMERAL_SEQUENTIAL); //輸出建立節點的路徑 logger.debug("=========zookeeper==========created path:" + path); //註冊事件監聽 zkClient.subscribeChildChanges(nodeName, new ZKChildListener(zkClient)); systemList=new ArrayList<SystemRegisterInfo>(); List<String> children = zkClient.getChildren(nodeName);//獲取子節點 for (String string : children) { Object readData2 = zkClient.readData(nodeName+"/"+string); SystemRegisterInfo readData = (SystemRegisterInfo) readData2; systemList.add(readData); } List<SystemRegisterInfo> collect = systemList.stream().distinct().collect(Collectors.toList()); systemList=collect; } catch (Exception e) { logger.debug("==========zookeeper==========connect==============" + e.getMessage()); } } }
package fsl.lcp.utils.zk; import java.io.Serializable; import com.fasterxml.jackson.databind.annotation.JsonSerialize; /** * 系統註冊資訊 * @author jun * */ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) public class SystemRegisterInfo implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String ip;//本系統ip private int port;//本系統埠 private String name;//系統名稱 private String nextNode;//本系統節點地址 private String isLogin="N";//是否本系統登陸獲取 private String attribute1;//擴充套件欄位 private String attribute2;//擴充套件欄位 private String attribute3;//擴充套件欄位 private String attribute4;//擴充套件欄位 public String getAttribute1() { return attribute1; } public void setAttribute1(String attribute1) { this.attribute1 = attribute1; } public String getAttribute2() { return attribute2; } public void setAttribute2(String attribute2) { this.attribute2 = attribute2; } public String getAttribute3() { return attribute3; } public void setAttribute3(String attribute3) { this.attribute3 = attribute3; } public String getAttribute4() { return attribute4; } public void setAttribute4(String attribute4) { this.attribute4 = attribute4; } public String getIsLogin() { return isLogin; } public void setIsLogin(String isLogin) { this.isLogin = isLogin; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getNextNode() { return nextNode; } public void setNextNode(String nextNode) { this.nextNode = nextNode; } @Override public int hashCode() { String str = ip + isLogin+name+nextNode+port; return str.hashCode(); } @Override public boolean equals(Object obj) { SystemRegisterInfo p = (SystemRegisterInfo) obj; return name.equals(p.getName()) && isLogin.equals(p.getIsLogin())&& ip.equals(p.getIp())&& nextNode.equals(p.getNextNode())&&port==p.getPort(); } }
package fsl.lcp.utils.zk;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 節點監聽
* @author Jun
*
*/
public class ZKChildListener implements IZkChildListener{
private Logger logger = LoggerFactory.getLogger(fsl.lcp.utils.zk.ZKChildListener.class);
public ZkClient zkClient=null;
public ZKChildListener(ZkClient zkClient) {
super();
this.zkClient = zkClient;
}
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
List<SystemRegisterInfo> list=new ArrayList<>();
for (String string : currentChilds) {
Object readData2=null;
try {
readData2 = zkClient.readData(parentPath+"/"+string);
} catch (Exception e) {
e.printStackTrace();
logger.debug("=======zookeeper====node="+parentPath+"/"+string+"==========wrong=========="+e.getMessage());
}
if(null!=readData2){
SystemRegisterInfo readData = (SystemRegisterInfo) readData2;
list.add(readData);
}
}
RegisterServer.systemList=list.stream().distinct().collect(Collectors.toList());
}
public ZKChildListener() {
super();
}
}
3.注入Bean,並在啟動的時候進行初始化
<bean id="registerServer" class="fsl.lcp.utils.zk.RegisterServer" scope="singleton" init-method="init"></bean>
4.編寫Resit註冊資訊測試類
package fsl.lcp.utils.zk;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
public class Resit {
public ZkClient zkClient=null;
public static void main(String[] args) {
Resit re=new Resit();
re.createNode();
}
public void createNode(){
//zk叢集的地址
String ZKServers = "127.0.0.1:2181";
zkClient = new ZkClient(ZKServers, 5000);
SystemRegisterInfo systemRegisterInfo = new SystemRegisterInfo();
systemRegisterInfo.setIp("189.23.23.65");
systemRegisterInfo.setPort(80);
systemRegisterInfo.setName("資產系統");
systemRegisterInfo.setNextNode("capital");
String path = zkClient.create("/lcp-all/capital", systemRegisterInfo, CreateMode.EPHEMERAL_SEQUENTIAL);
try {
this.handle(path);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//伺服器的具體業務處理功能
private void handle(String serverName) throws Exception {
System.out.println("server " + serverName
+ " is waiting for task process......");
Thread.sleep(Long.MAX_VALUE);
}
}
5.測試
先啟動專案,註冊本系統資訊,這時先進行獲取系統資訊列表,然後在執行一次Resit類,再獲取系統列表資訊
補充:logback為日誌框架,zookeeper預設會列印心跳日誌,如下更改,取消日誌
<logger name="org.apache.zookeeper.ClientCnxn" level="info" />
參考
https://www.2cto.com/kf/201707/661220.html
https://blog.csdn.net/sun_wangdong/article/details/77461108
考慮到zookeeper伺服器宕機情況,RegisterServer.java程式碼修改如下
package com.fsl.lcp.utils.zk;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
/**
* 初始化
* @author Jun
* 2018年11月1日
*/
public class RegisterServer {
private Logger logger = LoggerFactory.getLogger(com.fsl.lcp.utils.zk.RegisterServer.class);
public static volatile List<SystemRegisterInfo> systemList = null;
public static volatile SystemRegisterInfo systemRegisterInfo= null;
public ZkClient zkClient=null;
@Value("${zookeeper.connectString}")
private String connectString;//zookeeper地址
@Value("${zookeeper.nodeName}")
private String nodeName;//節點
@Value("${zookeeper.localIp}")
private String localIp;//本系統ip
@Value("${zookeeper.localPort}")
private int localPort;//本系統埠
@Value("${zookeeper.localSystemName}")
private String localSystemName;//本系統描述
@Value("${zookeeper.localSystemCode}")
private String localSystemCode;//本系統編碼
/**
* 初始化
*/
public void init(){
try {
systemRegisterInfo = new SystemRegisterInfo();
systemRegisterInfo.setIp(localIp);
systemRegisterInfo.setPort(localPort);
systemRegisterInfo.setName(localSystemName);
systemRegisterInfo.setNextNode(localSystemCode);
zkClient = new ZkClient(connectString, 5000);
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
if("Disconnected".equals(state.name())){
//斷開了
systemList=new ArrayList<>();
logger.debug("====zookeeper====connect===is==Disconnected==========");
}else if("SyncConnected".equals(state.name())){
//又連線上了
getInfo(zkClient);
logger.debug("====zookeeper====connect===is==SyncConnected==========");
}else{
logger.debug("====zookeeper====connect===is============"+state.name());
}
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
// TODO Auto-generated method stub
logger.debug("====zookeeper====connect===is============"+error.getMessage());
}
@Override
public void handleNewSession() throws Exception {
logger.debug("====zookeeper====connect===is=======new session=====");
}
});
logger.debug("====zookeeper====connect===is==ok==========");
logger.debug("==========zookeeper=====local=====info=======is======="+systemRegisterInfo );
getInfo(zkClient);
} catch (Exception e) {
logger.debug("==========zookeeper==========connect==============" + e.getMessage());
}
}
/**
* 獲取資訊
* @param zkClient
* @param systemRegisterInfo
*/
public void getInfo(ZkClient zkClient){
zkClient.createPersistent(nodeName, true);
//建立節點-臨時有序
String path = zkClient.create(nodeName +"/"+systemRegisterInfo.getNextNode(), systemRegisterInfo,
CreateMode.EPHEMERAL_SEQUENTIAL);
//輸出建立節點的路徑
logger.debug("=========zookeeper==========created path:" + path);
//註冊事件監聽
zkClient.subscribeChildChanges(nodeName, new ZKChildListener(zkClient));
systemList=new ArrayList<SystemRegisterInfo>();
List<String> children = zkClient.getChildren(nodeName);//獲取子節點
for (String string : children) {
Object readData2 = zkClient.readData(nodeName+"/"+string);
SystemRegisterInfo readData = (SystemRegisterInfo) readData2;
logger.debug("==========zookeeper=====startNow=====allInfo=======is======="+readData );
systemList.add(readData);
}
List<SystemRegisterInfo> collect = systemList.stream().distinct().collect(Collectors.toList());
systemList=collect;
}
}