《netty權威指南》之模擬伺服器之間的心跳檢測
阿新 • • 發佈:2018-12-16
在叢集環境下伺服器之間是要定時進行心跳檢測的,那麼netty可以用來做這件事,
在叢集環境中,選定一臺服務區做master,其餘的做salve
即master <==> server端
salve <==> 客戶端
客戶端定時像服務端傳送請求,當然在請求之間先進行認證
服務端程式碼如下
package com.lyzx.netty.netty06; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; /** * @author hero.li * */ public class Server{ public static void main(String[] args) throws InterruptedException { //開啟兩個執行緒組,一個用於接受客戶端的請求 另一個用於非同步的網路IO的讀寫 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //Netty啟動的輔助類 裡面封裝了客戶端和服務端的連結以及如何處理選擇器 selector等邏輯 ServerBootstrap b = new ServerBootstrap(); //傳入兩個執行緒組,設定傳輸塊大小為1k, //新增ServerHandler型別的過濾器(表示如何處理這些訊息,過濾器當然要整合netty的一個介面) b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelHandler[] arr = {MarshallingCodeCFactory.marshallingDecoder(), MarshallingCodeCFactory.marshallingEncoder(), new ReadTimeoutHandler(30), new ServerHandler()}; ch.pipeline().addLast(arr); } }); //同步等待繫結埠結束 ChannelFuture f = b.bind(9988).sync(); //等待服務端監聽埠關閉 f.channel().closeFuture().sync(); //優雅的關閉執行緒組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
服務端Handler
package com.lyzx.netty.netty06; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 對於網事件做讀寫,通常只要關注channelRead()和exceptionCaught()即可 */ public class ServerHandler extends ChannelInboundHandlerAdapter { private static final String AUTH_SUCCESS_FLAG = "AUTH_SUCCESS"; private static final String AUTH_FAIL_FLAG = "AUTH_FAIL"; private static final Map<String,String> KEYS = new ConcurrentHashMap<>(); static{ //這兒本應該讀取資料庫以初始化可以訪問該伺服器的客戶端 KEYS.put("192.168.22.170","abcd007"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg.getClass() == String.class){ //剛建立連線時的握手資訊 String ipAndSuffix = String.valueOf(msg); String[] ipAndSuffiArr = ipAndSuffix.split("_"); String ip = ipAndSuffiArr[0]; String suffix = ipAndSuffiArr[1]; System.out.println("ip:"+ip+" , "+suffix); if(KEYS.containsKey(ip)){ if(suffix.equals(KEYS.get(ip))){ ctx.channel().writeAndFlush(AUTH_SUCCESS_FLAG); return; } } ctx.channel() .writeAndFlush(AUTH_FAIL_FLAG) .addListener(ChannelFutureListener.CLOSE); }else{ System.out.println("server:channelRead____通道可讀開始"); NettyRequest nr = (NettyRequest)msg; System.out.println("server:收到的訊息____:"+nr); String datetime = LocalDateTime.now() .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS")); nr.setMsg(datetime); ctx.channel().writeAndFlush(nr); System.out.println("server:channelRead____通道可讀結束"); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("server:channelReadComplete____通道可讀完成 "); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("server:exceptionCaught____發生異常"); ctx.close(); } }
客戶端程式碼
package com.lyzx.netty.netty06; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; public class Client { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception{ ChannelHandler[] arr = {MarshallingCodeCFactory.marshallingDecoder(), MarshallingCodeCFactory.marshallingEncoder(), new ReadTimeoutHandler(30), new ClientHandler()}; ch.pipeline().addLast(arr); } }); ChannelFuture f = b.connect("127.0.0.1", 9988).sync(); f.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶端Handler
package com.lyzx.netty.netty06;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetAddress;
public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final String AUTH_SUCCESS_FLAG = "AUTH_SUCCESS";
private static final String AUTH_FAIL_FLAG = "AUTH_FAIL";
private String auth_suffix = "abcd007";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:channelActive____通道啟用開始");
String ip = InetAddress.getLocalHost().getHostAddress();
String auth_key = ip+"_"+auth_suffix;
ctx.channel().writeAndFlush(auth_key);
System.out.println("client:channelActive____通道啟用結束");
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception {
if(msg.getClass() == String.class){
if(AUTH_SUCCESS_FLAG.equals(msg)){
new Thread(new Scheduler(ctx)).start();
}else{
System.out.println("========認證失敗:"+AUTH_FAIL_FLAG);
}
}else{
NettyRequest nr = (NettyRequest)msg;
System.out.println("client____response time:"+nr);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:通道可讀完成");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client:發生異常");
}
}
class Scheduler implements Runnable{
private ChannelHandlerContext ctx;
public Scheduler(ChannelHandlerContext ctx){
this.ctx = ctx;
}
@Override
public void run() {
//模擬定時傳送心跳訊息
for(int i=0;i<20;i++){
NettyRequest nr = new NettyRequest();
nr.setId((long)i);
nr.setCode(i);
nr.setMsg("data_data:"+i);
ctx.channel().writeAndFlush(nr);
try {Thread.sleep(2000);
}catch(InterruptedException e){e.printStackTrace();}
}
}
}
其他工具類
package com.lyzx.netty.netty06;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工廠
* @author(alienware)
* @since 2014-12-16
*/
public final class MarshallingCodeCFactory {
/**
* 建立Jboss Marshalling解碼器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder marshallingDecoder() {
//首先通過Marshalling工具類的方法獲取Marshalling例項物件
//引數serial標識建立的是java序列化工廠物件。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//建立了MarshallingConfiguration物件,配置了版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根據marshallerFactory和configuration建立provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingDecoder物件,
//倆個引數分別為provider和單個訊息序列化後的最大長度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 建立Jboss Marshalling編碼器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder marshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//構建Netty的MarshallingEncoder物件,
//MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
通訊實體類
package com.lyzx.netty.netty06;
import java.io.Serializable;
public class NettyRequest implements Serializable {
private Long id;
private int code;
private String msg;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "NettyRequest{" +
"id=" + id +
", code=" + code +
", msg='" + msg + '\'' +
'}';
}
}