Netty實現客戶端和服務端的通訊
阿新 • • 發佈:2019-02-13
Netty是Apache團隊的又一個優秀的Socket框架,它和mina是一個團隊開發的,所以很多思想是相同的,接下來,我們就來實現客戶端和服務端的雙向通訊。
首先,我們定義訊息型別:
/**
* 訊息型別
* @author 李熠
*
*/
public enum MsgType {
PING,SEND,LOGIN,NO_TARGET
}
分別是心跳、傳送、登入、找不到目標當客戶端和服務端連線後,需要向服務端傳送登入請求,也就是訊息型別:LOGIN,服務端接收到LOGIN請求後,會將客戶端加入到佇列中,
這類是定義的訊息Bean,想服務端傳送訊息就是傳送的這個物件的資料。import java.io.Serializable; public class Message implements Serializable { private static final long serialVersionUID = -5756901646411393269L; private String clientId;//傳送者客戶端ID private MsgType type;//訊息型別 private String data;//資料 private String targetId;//目標客戶端ID public String getTargetId() { return targetId; } public void setTargetId(String targetId) { this.targetId = targetId; } public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MsgType getType() { return type; } public void setType(MsgType type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public Message(){ } public Message(MsgType type){ this.type = type; } }
接下來,實現客戶端佇列程式碼:
服務端:import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class NettyChannelMap { private static Map<String , SocketChannel> map = new ConcurrentHashMap<>(); public static void add(String clientId,SocketChannel channel){ map.put(clientId, channel); } public static Channel get(String clientId){ return map.get(clientId); } public static void remove(SocketChannel channel){ for (Map.Entry<String,SocketChannel> entry:map.entrySet()){ if (entry.getValue()==channel){ map.remove(entry.getKey()); } } } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.nio.charset.Charset; public class NettyServer { private int port; public SocketChannel socketChannel; public NettyServer(int port) throws InterruptedException { this.port = port; bind(); } private void bind() throws InterruptedException { EventLoopGroup boss=new NioEventLoopGroup(); EventLoopGroup worker=new NioEventLoopGroup(); ServerBootstrap bootstrap=new ServerBootstrap(); bootstrap.group(boss,worker); bootstrap.channel(NioServerSocketChannel.class); bootstrap.option(ChannelOption.SO_BACKLOG, 128); //通過NoDelay禁用Nagle,使訊息立即發出去,不用等待到一定的資料量才發出去 bootstrap.option(ChannelOption.TCP_NODELAY, true); //保持長連線狀態 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); //字串類解析
//這裡只能新增字串的編碼和解碼器,
//網上有很多例子是這樣寫的:
//這種寫法只能所有客戶端都用netty寫,否則其他框架實現的客戶端無法傳送訊息到服務端,因為他是轉換的netty自己的Object
//p.addLast(new ObjectEncoder());
//p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
p.addLast(new StringEncoder(Charset.forName("UTF-8")));
p.addLast(new StringDecoder(Charset.forName("UTF-8")));
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("server start---------------");
}
}
public static void main(String []args) throws InterruptedException {
if(args.length == 0){
new NettyServer(9999);
}else{
new NettyServer(Integer.parseInt(args[0]));
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import cn.sunsharp.netty.bean.Message;
import cn.sunsharp.netty.bean.MsgType;
import cn.sunsharp.netty.bean.NettyChannelMap;
import com.alibaba.fastjson.JSON;
//最好繼承
SimpleChannelInboundHandler<String>表示傳遞字串訊息,handler會把json格式的字串轉換為Message物件
public class NettyServerHandler extends SimpleChannelInboundHandler<String>{@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //channel失效,從Map中移除 NettyChannelMap.remove((SocketChannel)ctx.channel()); }@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {//cause.printStackTrace();System.out.println("出現異常!");}@Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg)throws Exception {System.out.println(msg);Message message = JSON.parseObject(msg+"", Message.class);System.out.println("接收到訊息:"+message);String clientId = message.getClientId();if(MsgType.LOGIN.equals(message.getType())){System.out.printf("將%s新增到佇列\n",clientId); NettyChannelMap.add(clientId,(SocketChannel)ctx.channel()); }else{ if(NettyChannelMap.get(clientId)==null){ System.out.printf("登入失敗,請重新登入!",clientId); //說明未登入,或者連線斷了,伺服器向客戶端發起登入請求,讓客戶端重新登入 message = new Message(MsgType.LOGIN); ctx.channel().writeAndFlush(JSON.toJSONString(message)); } } switch (message.getType()){ case PING:{ message = new Message(MsgType.PING); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); }break; case SEND:{ //收到客戶端的請求,傳送給targetId System.out.println("傳送訊息:"+message); if(NettyChannelMap.get(message.getTargetId()) != null){ NettyChannelMap.get(message.getTargetId()).writeAndFlush(JSON.toJSONString(message)); }else{ message.setType(MsgType.NO_TARGET); NettyChannelMap.get(clientId).writeAndFlush(JSON.toJSONString(message)); } }break; default:break; }}}客戶端可以使用任何框架任何語言的Socket來連線併發送訊息,為了方便,這裡依然用Netty來實現客戶端:import java.nio.charset.Charset;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import cn.sunsharp.regulation.bean.Message;
import cn.sunsharp.regulation.bean.MsgType;
import com.alibaba.fastjson.JSON;
public class NettyClient {
private int port;
private String host;
public SocketChannel socketChannel;
private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
public NettyClient(int port, String host) {
this.port = port;
this.host = host;
start();
}
private void start(){
ChannelFuture future = null;
try {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host,port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
socketChannel.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
future =bootstrap.connect(host,port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("connect server 成功---------");
}else{
System.out.println("連線失敗!");
System.out.println("準備重連!");
start();
}
} catch (Exception e) {
}finally{
// if(null != future){
// if(null != future.channel() && future.channel().isOpen()){
// future.channel().close();
// }
// }
// System.out.println("準備重連!");
// start();
}
}
public static void main(String[]args) throws InterruptedException {
NettyClient bootstrap=new NettyClient(9999,"192.168.1.38");
System.out.println(11111);
Message loginMsg=new Message(MsgType.LOGIN);
loginMsg.setClientId("001");
loginMsg.setTargetId("192.168.1.38");
loginMsg.setType(MsgType.LOGIN);
bootstrap.socketChannel.writeAndFlush(JSON.toJSON(loginMsg));
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import cn.sunsharp.regulation.bean.Message;
import cn.sunsharp.regulation.bean.MsgType;
import com.alibaba.fastjson.JSON;
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
public static ChannelHandlerContext context = null;
//利用寫空閒傳送心跳檢測訊息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
Message pingMsg=new Message(MsgType.PING);
ctx.writeAndFlush(JSON.toJSON(pingMsg));
System.out.println("send ping to server----------");
break;
default:
break;
}
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg)
throws Exception {
Message message = JSON.parseObject(msg+"", Message.class);
MsgType msgType=message.getType();
switch (msgType){
case LOGIN:{
//向伺服器發起登入
message = new Message(MsgType.LOGIN);
ctx.writeAndFlush(JSON.toJSONString(message));
}break;
case PING:{
System.out.println("receive ping from server----------");
}break;
case SEND:{
//收到服務端訊息
System.out.println("收到服務端訊息:"+message.getData());
}break;
case NO_TARGET:{
//收到服務端訊息
System.out.println("找不到targetId:"+message.getTargetId());
}break;
default:break;
}
}
}