從零到日誌採集索引視覺化、監控報警、rpc trace跟蹤-系統上下線監控
阿新 • • 發佈:2019-02-12
public class AppChildrenChangeListener implements PathChildrenCacheListener {
private static final Logger LOGGER = LoggerFactory.getLogger(AppChildrenChangeListener.class);
private RabbitmqService rabbitmqService;
private ZkClient zkClient;
private AppInfoService appInfoService;
public AppChildrenChangeListener(RabbitmqService rabbitmqService, ZkClient zkClient, AppInfoService appInfoService) {
this.rabbitmqService = rabbitmqService;
this.zkClient = zkClient;
this.appInfoService = appInfoService;
}
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
String node = Constants.EMPTY_STR;
String app = Constants.EMPTY_STR;
String host = Constants.EMPTY_STR;
String info = Constants.EMPTY_STR;
String[] datas = null;
switch (event.getType()) {
case CHILD_ADDED:
node = event.getData().getPath();
app = this.getApp(node);
host = this.getHost(node);
if (!CacheService.appHosts.contains(node)) {
datas = this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON);
info = this.buildMsg(DateUtil.format(new Date(System.currentTimeMillis()), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_START);
// add to the queue
this.rabbitmqService.sendMessage(info, datas[0]);
LOGGER.info(info);
CacheService.appHosts.add(node);
this.appInfoService.add(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL, LogCollectionStatus.RUNNING);
}
this.appInfoService.add(host, app, Constants.ZK_NODE_TYPE_PERSISTENT, LogCollectionStatus.HISTORY);
break;
case CHILD_REMOVED:
node = event.getData().getPath();
app = this.getApp(node);
host = this.getHost(node);
datas = this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON);
info = this.buildMsg(DateUtil.format(new Date(System.currentTimeMillis()), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_STOP);
// add to the queue
this.rabbitmqService.sendMessage(info, datas[0]);
LOGGER.info(info);
if (CacheService.appHosts.contains(node)) {
CacheService.appHosts.remove(node);
this.appInfoService.delete(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL);
}
break;
case CHILD_UPDATED:
node = event.getData().getPath();
datas = this.zkClient.readData(node).toString().split(Constants.SEMICOLON);
app = this.getApp(node);
host = this.getHost(node);
info = this.buildMsg(DateUtil.format(new Date(Long.parseLong(datas[0])), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_APPENDER_STOP);
// add to the queue
this.rabbitmqService.sendMessage(info, this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON)[0]);
LOGGER.info(info);
this.appInfoService.update(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL, LogCollectionStatus.STOPPED);
break;
}
}
/**
* 根據node獲取app
* @param node
* @return
*/
private String getApp(String node) {
String tmp = node.substring(0, node.lastIndexOf(Constants.SLASH));
return this.getLast(tmp);
}
/**
* 根據node獲取host
* @param node
* @return
*/
private String getHost(String node) {
return this.getLast(node);
}
/**
* 返回末尾字串
* @param line
* @return
*/
private String getLast(String line) {
return line.substring(line.lastIndexOf(Constants.SLASH) + 1);
}
/**
* 構造報警msg
* @param time
* @param app
* @param host
* @param deploy
* @param msg
* @return
*/
private String buildMsg(String time, String app, String host, String deploy, String msg) {
AlertDto alertDto = new AlertDto(time, app, host, deploy, msg);
return alertDto.toString();
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(AppChildrenChangeListener.class);
private RabbitmqService rabbitmqService;
private ZkClient zkClient;
private AppInfoService appInfoService;
public
this.rabbitmqService = rabbitmqService;
this.zkClient = zkClient;
this.appInfoService = appInfoService;
}
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws
String node = Constants.EMPTY_STR;
String app = Constants.EMPTY_STR;
String host = Constants.EMPTY_STR;
String info = Constants.EMPTY_STR;
String[] datas = null;
switch (event.getType()) {
case CHILD_ADDED:
node = event.getData().getPath();
app = this.getApp(node);
if (!CacheService.appHosts.contains(node)) {
datas = this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON);
info = this.buildMsg(DateUtil.format(new Date(System.currentTimeMillis()), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_START);
// add to the queue
this.rabbitmqService.sendMessage(info, datas[0]);
LOGGER.info(info);
CacheService.appHosts.add(node);
this.appInfoService.add(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL, LogCollectionStatus.RUNNING);
}
this.appInfoService.add(host, app, Constants.ZK_NODE_TYPE_PERSISTENT, LogCollectionStatus.HISTORY);
break;
case CHILD_REMOVED:
node = event.getData().getPath();
app = this.getApp(node);
host = this.getHost(node);
datas = this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON);
info = this.buildMsg(DateUtil.format(new Date(System.currentTimeMillis()), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_STOP);
// add to the queue
this.rabbitmqService.sendMessage(info, datas[0]);
LOGGER.info(info);
if (CacheService.appHosts.contains(node)) {
CacheService.appHosts.remove(node);
this.appInfoService.delete(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL);
}
break;
case CHILD_UPDATED:
node = event.getData().getPath();
datas = this.zkClient.readData(node).toString().split(Constants.SEMICOLON);
app = this.getApp(node);
host = this.getHost(node);
info = this.buildMsg(DateUtil.format(new Date(Long.parseLong(datas[0])), DateUtil.YYYYMMDDHHMMSS), app,
this.getHost(node), datas[1], Constants.APP_APPENDER_STOP);
// add to the queue
this.rabbitmqService.sendMessage(info, this.zkClient.readData(Constants.ROOT_PATH_PERSISTENT + Constants.SLASH + app + Constants.SLASH + host).toString().split(Constants.SEMICOLON)[0]);
LOGGER.info(info);
this.appInfoService.update(host, app, Constants.ZK_NODE_TYPE_EPHEMERAL, LogCollectionStatus.STOPPED);
break;
}
}
/**
* 根據node獲取app
* @param node
* @return
*/
private String getApp(String node) {
String tmp = node.substring(0, node.lastIndexOf(Constants.SLASH));
return this.getLast(tmp);
}
/**
* 根據node獲取host
* @param node
* @return
*/
private String getHost(String node) {
return this.getLast(node);
}
/**
* 返回末尾字串
* @param line
* @return
*/
private String getLast(String line) {
return line.substring(line.lastIndexOf(Constants.SLASH) + 1);
}
/**
* 構造報警msg
* @param time
* @param app
* @param host
* @param deploy
* @param msg
* @return
*/
private String buildMsg(String time, String app, String host, String deploy, String msg) {
AlertDto alertDto = new AlertDto(time, app, host, deploy, msg);
return alertDto.toString();
}
}