Netty之檔案傳輸
轉:https://blog.csdn.net/zbw18297786698/article/details/53678133
1、工程的目錄結構
2、GZIP進行壓縮和解壓的工具類
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipUtils {
public static byte[] gzip(byte[] data) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
public static byte[] ungzip(byte[] data) throws Exception {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while ((num = gzip.read(buf, 0, buf.length)) != -1) {
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.flush();
bos.close();
return ret;
}
public static void main(String[] args) throws Exception {
// 讀取檔案
String readPath = System.getProperty("user.dir") + File.separatorChar
+ "sources" + File.separatorChar + "005.jpg";
File file = new File(readPath);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
System.out.println("檔案原始大小:" + data.length);
// 測試壓縮
byte[] ret1 = GzipUtils.gzip(data);
System.out.println("壓縮之後大小:" + ret1.length);
// 還原檔案
byte[] ret2 = GzipUtils.ungzip(ret1);
System.out.println("還原之後大小:" + ret2.length);
// 寫出檔案
String writePath = System.getProperty("user.dir") + File.separatorChar
+ "receive" + File.separatorChar + "005.jpg";
FileOutputStream fos = new FileOutputStream(writePath);
fos.write(ret2);
fos.close();
}
}
3、在客戶端的請求Request中,加入要傳輸的檔案(private byte[] attachment)
import java.io.Serializable;
public class Request implements Serializable {
/**
*
*/
private static final long serialVersionUID = -2813211330451521507L;
private String id;
private String name;
private String requestMessage;
private byte[] attachment;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
4、服務端的Response
import java.io.Serializable;
public class Response implements Serializable {
/**
*
*/
private static final long serialVersionUID = -5640678664176009458L;
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
@Override
public String toString() {
return "Response [id=" + id + ", name=" + name + ", responseMessage="
+ responseMessage + "]";
}
}
5、Jboss序列化工具
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工廠
*
*/
public final class MarshallingCodeCFactory {
/**
* 建立Jboss Marshalling解碼器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通過Marshalling工具類的精通方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
// 建立了MarshallingConfiguration物件,配置了版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根據marshallerFactory和configuration建立provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory, configuration);
// 構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
MarshallingDecoder decoder = new MarshallingDecoder(provider,
1024 * 1024 * 1);
return decoder;
}
/**
* 建立Jboss Marshalling編碼器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory, configuration);
// 構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
6、服務端的實現
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class Server {
public Server() {
}
public void bind(int port) throws Exception {
// 配置NIO執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 伺服器輔助啟動類配置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler())//
.option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 繫結埠 同步等待繫結成功
ChannelFuture f = b.bind(port).sync(); // (7)
// 等到服務端監聽埠關閉
f.channel().closeFuture().sync();
} finally {
// 優雅釋放執行緒資源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* 網路事件處理器
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增Jboss的序列化,編解碼工具
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder());
// 處理網路IO
ch.pipeline().addLast(new ServerHandler());
}
}
public static void main(String[] args) throws Exception {
new Server().bind(9999);
}
}
7、服務端的Handler的實現
import java.io.File;
import java.io.FileOutputStream;
import com.netty.utils.GzipUtils;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter {
// 用於獲取客戶端傳送的資訊
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Request req = (Request) msg;
System.out.println("Server : " + req.getId() + ", " + req.getName()
+ ", " + req.getRequestMessage());
// 進行圖片資源的還原
byte[] attachment = GzipUtils.ungzip(req.getAttachment());
// 獲取圖片的儲存目錄
String path = System.getProperty("user.dir") + File.separatorChar
+ "receive" + File.separatorChar + req.getName();
// 進行圖片的儲存
FileOutputStream fos = new FileOutputStream(path);
fos.write(attachment);
fos.close();
// 給客戶端,響應資料
Response resp = new Response();
resp.setId(req.getId());
resp.setName("resp" + req.getId());
resp.setResponseMessage("響應內容" + req.getId());
ctx.writeAndFlush(resp);// .addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// cause.printStackTrace();
ctx.close();
}
}
8、客戶端的實現
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
/**
* 連線伺服器
*
* @param port
* @param host
* @throws Exception
*/
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客戶端輔助啟動類 對客戶端配置
Bootstrap b = new Bootstrap();
b.group(group)//
.channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new MyChannelHandler());//
// 非同步連結伺服器 同步等待連結成功
ChannelFuture f = b.connect(host, port).sync();
// 等待連結關閉
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
System.out.println("客戶端優雅的釋放了執行緒資源...");
}
}
/**
* 網路事件處理器
*/
private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 新增Jboss的序列化,編解碼工具
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder());
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder());
// 處理網路IO
ch.pipeline().addLast(new ClientHandler());// 處理網路IO
}
}
public static void main(String[] args) throws Exception {
new Client().connect(9999, "127.0.0.1");
}
}
9、客戶端的Handler的實現
import java.io.File;
import java.io.FileInputStream;
import com.netty.utils.GzipUtils;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
//用於讀取客戶端發來的資訊
public class ClientHandler extends ChannelHandlerAdapter {
// 客戶端與服務端,連線成功的售後
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 進行圖片的傳輸
for (int i = 0; i < 5; i++) {
String fileName = "00" + (i + 1) + ".jpg";
// 進行檔案的讀取
String path = System.getProperty("user.dir") + File.separatorChar
+ "sources" + File.separatorChar + fileName;
File file = new File(path);
Request req = new Request();
req.setId("" + i);
req.setName(file.getName());
req.setRequestMessage("資料資訊" + i);
// 進行圖片的讀取
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
// 進行資料的壓縮
req.setAttachment(GzipUtils.gzip(data));
// 向服務端,傳送圖片資訊
ctx.channel().writeAndFlush(req);
}
}
// 只是讀資料,沒有寫資料的話
// 需要自己手動的釋放的訊息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
Response response = (Response) msg;
System.out.println(response);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}