SSM(二):Netty接收資料並存入資料庫出現數據接收不完全的情況
阿新 • • 發佈:2018-12-02
1.NettyServerStart
public class NettyServerStart { @Autowired public NettyServerStart(final NettyServer nettyServer) { System.out.println("------------Spring自動載入 ---------"); System.out.println("------------啟動Netty服務 ---------"); //繫結埠 nettyServer.setServerPort(6667); ExecutorService executorService= Executors.newCachedThreadPool(); //執行nettyServer executorService.execute(nettyServer); } }
2.NettyServerInitialize
public class NettyServerInitialize extends ChannelInitializer<SocketChannel> { @Autowired private NettyServerHandler nettyServerHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(8192,12,2,0,0)); // pipeline.addLast("decoder", new CCYLengthFieldDecoder(8192,12,2,0,0)); // pipeline.addLast("decoder",new CNPCDecoder()); pipeline.addLast("handler", nettyServerHandler); System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連線上"); } }
3.NettyServerHandler
重點要說一下這個channelRead0方法裡面我遇到的問題。由於某些原因還未來得及系統學習Netty,但是又急需把資料收集起來存入資料庫。情況是這樣的:有一個1w多個位元組的資料需要我來解析出真實資料,但是每次用網路測試助手往埠傳送資料,收到的資料都會出現不完全的情況。剛開始,我也不知道啥狀況,於是對於那些不完整的資料採取的是不處理。後來才知道,TCP是不會出現丟包的,當資料長度過長的時候,路由器轉發的時候會對資料進行拆分,分多少次發,channelRead0這個方法就會執行多少次,也就是說當我們覺得收到的資料不完全的時候,不是資料丟失,只是channelRead0方法還在執行中,資料沒有收發完。解決辦法:自己建立一個動態陣列,用於每次執行channelRead0方法儲存資料,然後校驗首位末尾,校驗通過之後,解析完畢,立刻存入資料庫,然後清空動態陣列。下圖可以看到,隨著channelRead0不斷執行,動態陣列越來越長。
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
@Autowired
private CollectedDataService collectedDataService;
private Map<String, DynamicArray<Byte>> dynamicArrayMap = new HashMap<>();
private int defaultDynamicCapacity = 1024 * 1024;
/**
* A thread-safe Set Using ChannelGroup, you can categorize Channels into a meaningful group.
* A closed Channel is automatically removed from the collection,
*/
public static ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
// Broadcast a message to multiple Channels
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
}
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // (4)
ByteBuf in = (ByteBuf) msg;
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = insocket.getAddress().getHostAddress();
if(!dynamicArrayMap.containsKey(ip)){
dynamicArrayMap.put(ip,new DynamicArray<Byte>(defaultDynamicCapacity));
}
DynamicArray<Byte> dynamicArray = dynamicArrayMap.get(ip);
while (in.isReadable()) {
byte recByte = in.readByte();
dynamicArray.addLast(recByte);
}
System.out.println("------------------------------------");
System.out.println("dynamicArray長度:"+dynamicArray.getSize());
byte [] dataByte = new byte[dynamicArray.getSize()];
//從Byte[]獲得char[]
for (int i = 0;i<dynamicArray.getSize();i++) {
byte b = dynamicArray.get(i);
dataByte[i] = b;
}
char[] byteToChars = getChars(dataByte);
Integer validSuccessIndex = null;
//todo 校驗成功
if(byteToChars[0]=='0' && byteToChars[1] == '1'
&& byteToChars[byteToChars.length-2]== '0'
&& byteToChars[byteToChars.length-1] == '3'
) {
validSuccessIndex = byteToChars.length;
System.out.println("成功位:"+ validSuccessIndex);
System.out.println("成功位:"+ validSuccessIndex);
collectedDataService.addCnpcTankData(String.valueOf(byteToChars));
}
if(byteToChars[0]=='1' && byteToChars[1] == 'B'
&& byteToChars[byteToChars.length-2]== '0'
&& byteToChars[byteToChars.length-1] == 'D'
) {
validSuccessIndex = byteToChars.length;
System.out.println("成功位:"+ validSuccessIndex);
System.out.println("成功位:"+ validSuccessIndex);
collectedDataService.addCnpcGunData(String.valueOf(byteToChars));
System.out.println("清空dynamicArray之前:"+dynamicArray.getSize());
}
if(validSuccessIndex != null){
// Byte[] validBuffer = dynamicArray.leftShift(validSuccessIndex);
dynamicArray.leftShift(validSuccessIndex);
System.out.println("清空dynamicArray之後:"+dynamicArray.getSize());
//清空byteToChar
}
System.out.println("1st:"+byteToChars[0]+" 2nd:"+byteToChars[1]+
" 3rd:"+byteToChars[byteToChars.length-2]+" 4th:"+byteToChars[byteToChars.length-1]);
for (char byteTochar : byteToChars
) {
System.out.print(byteTochar);
}
// System.out.println("--------------------");
// System.out.println("dataByte.lenth:"+dataByte.length);
//ValidSuccessIndex不為空則校驗成功
}
byte[] toPrimitives(Byte[] oBytes)
{
byte[] bytes = new byte[oBytes.length];
for(int i = 0; i < oBytes.length; i++) {
bytes[i] = oBytes[i];
}
return bytes;
}
char[] getChars(byte[] bytes) {
Charset cs = Charset.forName("UTF-8");
ByteBuffer bb = ByteBuffer.allocate(bytes.length);
bb.put(bytes);
bb.flip();
CharBuffer cb = cs.decode(bb);
return cb.array();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "上線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel incoming = ctx.channel();
System.out.println("Client:" + incoming.remoteAddress() + "異常");
// 當出現異常就關閉連線
//cause.printStackTrace();
ctx.close();
}
}
4.NettyServer
package com.cnpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Created by caihanbin on 2017/4/29.
*/
@Component
public class NettyServer implements Runnable {
@Autowired
private NettyServerInitialize nettyServerInitialize;
private int port;
//設定埠
public void setServerPort(int port) {
this.port = port;
}
public NettyServer(){}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(nettyServerInitialize)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 繫結埠,開始接收進來的連線
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待伺服器 socket 關閉 。
// 在這個例子中,這不會發生,但你可以優雅地關閉你的伺服器。
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}