Netty之心跳檢測(可應用在分散式架構)
這次繼續上一篇說到的心跳檢測
我們使用Socket通訊一般經常會處理多個伺服器之間的心跳檢測,一般來講我們去維護伺服器叢集,肯定要有一臺或者N臺伺服器主機(Master),然後還應該有N臺(Slave),那麼我們的主機肯定要時時刻刻知道自己下面的從伺服器的各方面情況,然後進行實時監控的功能,這個在分散式架構裡叫做心跳檢測或者是心跳監控。最佳處理方案還是覺得是使用一些通訊框架進行實現,我們的Netty就可以去做這樣一件事情。
過程是這樣的,從伺服器Slave先發送ip+key給主伺服器Master進行認證,Master會有屬於自己管理的slave的資訊(存在快取或者資料庫也好),將Slave傳送過來的認證資訊進行認證(比對ip和key是否是存在且正確的),如果認證成功Master會返回認證成功的資訊給Slave,當Slave接受到認證成功的訊息時,就會開始啟動定時執行緒去定時傳送心跳包給Master。現在,我們暫時傳送的心跳包的資料為Slave的cpu資訊和記憶體資訊。那麼伺服器的cpu資訊和記憶體資訊怎麼獲取到呢,我們可以使用Sigar。
Sigar的介紹與使用:
詳細的就不說了,下面的兩篇博文寫得挺好的。瞭解一下就ok。
最需要注意的是:Sigar為不同平臺提供了不同的庫檔案,我們記得需要將庫檔案放到jdk安裝路徑的bin資料夾下。
下面是解壓後的東東,那兩個包拉到自己的專案中,然後我的系統是windows64位,所以將sigar-amd64-winnt.dll放到jdk安裝路徑的bin資料夾下。
現在上程式碼吧。伺服器Server必做是主伺服器Master,客戶端Client比作是從伺服器Slave。
伺服器Server:
public class Server { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //新增Marshalling的編解碼器 ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()); ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()); ch.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = sb.bind(8765).sync(); cf.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
伺服器Handler:
public class ServerHeartBeatHandler extends ChannelHandlerAdapter{ /** * 將從伺服器的資訊放在這裡,正常應該是從資料庫拿或者從快取拿 */ private static Map<String,String> AUTH_IP_MAP = new HashMap<String, String>(); /** * 認證成功返回給slave的key,一般是slave傳過來的key的再加密後的東西,但是這裡簡單處理了 */ private static final String SUCCESS_KEY = "auth_success_key"; static{ /** * 假設有一臺slave ip:169.254.138.182 key:1234 */ AUTH_IP_MAP.put("169.254.138.182", "1234"); //slave的ip+key } private void auth(ChannelHandlerContext cx,Object msg){ String[] result = ((String)msg).split(","); String authKey = AUTH_IP_MAP.get(result[0]); if(authKey !=null && authKey.equals(result[1])){ cx.writeAndFlush(SUCCESS_KEY); //認證成功給slave返回資訊 }else{ cx.writeAndFlush("auth failure!").addListener(ChannelFutureListener.CLOSE); //認證不成功就返回失敗資訊並關閉通道 } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ //如果是字串型別為認證 auth(ctx,msg); }else if(msg instanceof RequestInfo){ //心跳包,將資料讀出來 System.out.println("心跳"); RequestInfo info = (RequestInfo)msg; System.out.println("--------------------------------------"); System.out.println("當前主機ip為:"+info.getIp()); System.out.println("當前主機CPU的情況:"); Map<String,Object> cpuPercMap = info.getCpuPercMap(); System.out.println("總使用率:"+cpuPercMap.get("combined")); System.out.println("使用者使用率:"+cpuPercMap.get("user")); System.out.println("系統使用率:"+cpuPercMap.get("sys")); System.out.println("等待率:"+cpuPercMap.get("wait")); System.out.println("空閒率"+cpuPercMap.get("idle")); System.out.println("當前主機記憶體的情況:"); Map<String,Object> memMap = info.getMemoryMap(); System.out.println("記憶體總量:"+memMap.get("total")); System.out.println("當前記憶體使用量:"+memMap.get("used")); System.out.println("當前記憶體剩餘量:"+memMap.get("free")); System.out.println("--------------------------------------"); ctx.writeAndFlush("info received!"); }else{ ctx.writeAndFlush("connect failure").addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客戶端Client:
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
ch.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
ch.pipeline().addLast(new ClientHeartBeatHandler());
}
});
ChannelFuture ch = bs.connect("127.0.0.1", 8765).sync();
ch.channel().closeFuture().sync();
workGroup.shutdownGracefully();
}
}
客戶端Handler:
public class ClientHeartBeatHandler extends ChannelHandlerAdapter{
private ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); //定時執行緒池,做心跳監控用
private ScheduledFuture<?> heartBeat;
private InetAddress addr;
private static final String SUCCESS_KEY = "auth_success_key"; //認證成功的key,slave和master一定要約定好
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//通道開啟活躍時,進行認證
System.out.println("進行認證");
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
System.out.println(ip);
String key = "1234";
String auth = ip+","+key;
ctx.writeAndFlush(auth);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
/**
* 接收master返回的認證資訊
*/
if(msg instanceof String){
String result = (String)msg;
if(SUCCESS_KEY.equals(result)){ //如果認證成功了,主動傳送心跳資料
/**
* 第一個引數:執行的實現Runnable介面的任務
* 第二個引數:初始化延遲時間
* 第三個引數:每次執行任務的間隔
* 第四個引數:時間間隔的單位
*/
this.heartBeat = this.pool.scheduleWithFixedDelay(new HeartBeatTask(ctx), 2, 5, TimeUnit.SECONDS);
System.out.println(msg);
}else{
System.out.println(msg);
}
}
}finally{
ReferenceCountUtil.release(msg);
}
}
private class HeartBeatTask implements Runnable{
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() { //往master發資料
try {
RequestInfo info = new RequestInfo();
//ip
info.setIp(addr.getHostAddress());
//cpu prec
Map<String,Object> cpuPercMap = new HashMap<String,Object>();
Sigar sigar = new Sigar();
CpuPerc cpuPerc = sigar.getCpuPerc();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user",cpuPerc.getUser());
cpuPercMap.put("sys",cpuPerc.getSys());
cpuPercMap.put("wait",cpuPerc.getWait());
cpuPercMap.put("idel",cpuPerc.getIdle());
//memory
Map<String,Object> memoryMap = new HashMap<String,Object>();
Mem mem = sigar.getMem();
memoryMap.put("total", mem.getTotal() / 1024L); //原本資料的單位為bit,現在轉為k
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info);
} catch (SigarException e) {
e.printStackTrace();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Marshalling工廠就不放出來了,上上篇有。
心跳資料實體:因為傳送的心跳資料是實體,所以請一定記得實現Serializable介面!!!!!不然程式碼對了都傳送不了。
public class RequestInfo implements Serializable{
private static final long serialVersionUID = 1L;
private String ip;
private Map<String,Object> cpuPercMap; //cpu資料
private Map<String,Object> memoryMap; //記憶體資料
//... other field
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Map<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(Map<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public Map<String, Object> getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(Map<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}
}
測試:這裡是測試成功的結果,如果不想成功,隨便弄一個ip或者key不相同就好了,伺服器會發送cnnect failure,然後斷開與客戶端連線的通道。
伺服器:
17:42:29] nioEventLoopGroup-0-0 INFO [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x122e050e] REGISTERED
[17:42:29] nioEventLoopGroup-0-0 INFO [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x122e050e] BIND: 0.0.0.0/0.0.0.0:8765
[17:42:29] nioEventLoopGroup-0-0 INFO [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x122e050e, /0:0:0:0:0:0:0:0:8765] ACTIVE
[17:42:33] nioEventLoopGroup-0-0 INFO [] [] [io.netty.handler.logging.LoggingHandler] - [id: 0x122e050e, /0:0:0:0:0:0:0:0:8765] RECEIVED: [id: 0x40e3e2d4, /127.0.0.1:56210 => /127.0.0.1:8765]
心跳
--------------------------------------
當前主機ip為:169.254.138.182
當前主機CPU的情況:
總使用率:0.07003501750875438
使用者使用率:0.02351175587793897
系統使用率:0.04652326163081541
等待率:0.0
空閒率null
當前主機記憶體的情況:
記憶體總量:12455076
當前記憶體使用量:6589636
當前記憶體剩餘量:5865440
--------------------------------------
心跳
--------------------------------------
當前主機ip為:169.254.138.182
當前主機CPU的情況:
總使用率:0.07003501750875438
使用者使用率:0.04652326163081541
系統使用率:0.02351175587793897
等待率:0.0
空閒率null
當前主機記憶體的情況:
記憶體總量:12455076
當前記憶體使用量:6585724
當前記憶體剩餘量:5869352
--------------------------------------
心跳
--------------------------------------
當前主機ip為:169.254.138.182
當前主機CPU的情況:
總使用率:0.14092953523238383
使用者使用率:0.062468765617191405
系統使用率:0.07846076961519241
等待率:0.0
空閒率null
當前主機記憶體的情況:
記憶體總量:12455076
當前記憶體使用量:6584932
當前記憶體剩餘量:5870144
--------------------------------------
java.io.IOException: 遠端主機強迫關閉了一個現有的連線。
客戶端:
進行認證
169.254.138.182
auth_success_key
info received!
info received!
info received!
小結:可以看到,客戶端一直在執行定時任務給服務端傳送心跳資料,伺服器在接受到心跳資料後也給客戶端寫回響應了,就這麼一直迴圈下去。最後是我直接斷開客戶端連線才停止了心跳檢測。