Netty例項 - 多個Netty實戰小例項
Netty例項(多個實戰小例項)
瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之18 【 部落格園 總入口】 QQ群:104131248 】
文章目錄
原始碼工程
原始碼IDEA工程獲取連結:Java 聊天室 實戰 原始碼
Netty是基於JDK NIO的網路框架
簡化了NIO程式設計, 不用程式自己維護selector, 將網路通訊和資料處理的部分做了分離
多用於做底層的資料通訊, 心跳檢測(keepalived)
1. 資料通訊
1.1 Hello World
public class Server { public static void main(String[] args) throws Exception { // 1 建立線兩個事件迴圈組 // 一個是用於處理伺服器端接收客戶端連線的 // 一個是進行網路通訊的(網路讀寫的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 建立輔助工具類ServerBootstrap,用於伺服器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 繫結倆個執行緒組 .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP .option(ChannelOption.SO_BACKLOG, 1024) // 設定TCP緩衝區 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設定傳送緩衝大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持連線 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel建立連線後的管道 // 3 在這裡配置 通訊資料的處理邏輯, 可以addLast多個... sc.pipeline().addLast(new ServerHandler()); } }); // 4 繫結埠, bind返回future(非同步), 加上sync阻塞在獲取連線處 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); //可以繫結多個埠 // 5 等待關閉, 加上sync阻塞在關閉請求處 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
SO_BACKLOG詳解:
伺服器的TCP核心維護兩個佇列A和B
客戶端向服務端請求connect時, 傳送SYN(第一次握手)
服務端收到SYN後, 向客戶端傳送SYN ACK(第二次握手), TCP核心將連線放入佇列A
客戶端收到後向服務端傳送ACK(第三次握手), TCP核心將連線從A->B, accept返回, 連線完成
A/B佇列的長度和即為BACKLOG, 當accept速度跟不上, A/B佇列使得BACKLOG滿了, 客戶端連線就會被TCP核心拒絕
可以調大backlog緩解這一現象, 經驗值~100
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = "返回給客戶端的響應:" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
// future完成後觸發監聽器, 此處是寫完即關閉(短連線). 因此需要關閉連線時, 要通過server端關閉. 直接關閉用方法ctx[.channel()].close()
//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("讀完了");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可以使用多個埠
//傳送訊息, Buffer型別. write需要flush才傳送, 可用writeFlush代替
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
//cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes()));
cf1.channel().closeFuture().sync();
//cf2.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
} finally {
// 記得釋放xxxHandler裡面的方法的msg引數: 寫(write)資料, msg引用將被自動釋放不用手動處理; 但只讀資料時,!必須手動釋放引用數
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
1.2 拆包粘包問題
TCP/IP確保了包的傳送, 包的順序等, 但程式設計中還需要解決拆包粘包問題
-> 接收的一連串包中的資料, 處理的分隔在哪裡? 基本解決方案:
1)特殊字元作為結束分隔符
2)訊息定長. 固定包的長度, 長度不夠用空格補全. 接收方需要trim, 效率不高不推薦
3)自定義協議. 在訊息頭中包含訊息總長度的欄位. 需要安全性時可以考慮.
特殊字元
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 使用DelimiterBasedFrameDecoder設定結尾分隔符$_
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
// 設定字串形式的解碼. 經過StringDecoder, Handler回撥方法中接收的msg的具體型別就是String了(不再是ByteBuffer). 但寫時仍需要傳入ByteBuffer
sc.pipeline().addLast(new StringDecoder());
// 通訊資料的處理邏輯
sc.pipeline().addLast(new ServerHandler());
}
});
//4 繫結連線
ChannelFuture cf = b.bind(8765).sync();
//等待伺服器監聽埠關閉
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server :" + msg);
String response = "伺服器響應: " + msg + "$_";
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes()));
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes()));
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
String response = (String) msg;
System.out.println("Client: " + response);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
定長
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//設定定長字串接收, 定長為5, 積累到5個位元組才會把資料發出去
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
//設定字串形式的解碼
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String request = (String)msg;
System.out.println("Server :" + msg);
String response = request ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new FixedLengthFrameDecoder(5));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes()));
cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes()));
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String response = (String) msg;
System.out.println("Client: " + response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
1.3 編解碼
即物件序列化技術, 目的是為了實現物件的網路傳輸和本地持久化
如果使用java的序列化, 碼流較大. 因此多用Marshalling, Kyro(基於Protobuf)
下面的例子, 使用編解碼傳輸javabean(Marshalling的javabean需要實現serializable), 並將message進行gzip壓縮
自定義編解碼器
public final class MarshallingCodeCFactory {
// 解碼
public static MarshallingDecoder buildMarshallingDecoder() {
//建立工廠物件, 引數serial指建立的是java物件序列化的工廠物件
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//建立配置物件,版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根據工廠物件和配置物件建立解碼provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//建立解碼器物件. 第一個引數是provider, 第二個引數是單個訊息序列化後的最大長度, 超過後拒絕處理
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
// 編碼
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//建立編碼器物件. 用於將實現Serializable介面的JavaBean序列化為二進位制陣列
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
javabean
public class Request implements Serializable { // 標記Serializable介面
private String id ;
private String name ;
private String requestMessage ;
private byte[] attachment;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
public class Response implements Serializable { // 標記Serializable介面
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
GZip壓縮的Util
public class GzipUtils {
public static byte[] gzip(byte[] data) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(bos);
gzip.write(data);
gzip.finish();
gzip.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
public static byte[] ungzip(byte[] data) throws Exception{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
GZIPInputStream gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num = -1;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
while((num = gzip.read(buf)) != -1 ){
bos.write(buf, 0, num);
}
gzip.close();
bis.close();
byte[] ret = bos.toByteArray();
bos.close();
return ret;
}
}
服務端與客戶端
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//設定日誌
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
// 新增編解碼. 傳送自定義的型別, 而Handler的方法接收的msg引數的實際型別也是相應的自定義類了
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request req = (Request)msg;
System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage());
byte[] attachment = GzipUtils.ungzip(req.getAttachment());
String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg";
FileOutputStream fos = new FileOutputStream(path);
fos.write(attachment);
fos.close();
Response resp = new Response();
resp.setId(req.getId());
resp.setName("resp" + req.getId());
resp.setResponseMessage("響應內容" + req.getId());
ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
for(int i = 0; i < 5; i++){
Request req = new Request();
req.setId("" + i);
req.setName("req" + i);
req.setRequestMessage("資料資訊" + i);
String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg";
File file = new File(path);
FileInputStream in = new FileInputStream(file);
byte[] data = new byte[in.available()];
in.read(data);
in.close();
req.setAttachment(GzipUtils.gzip(data)); //壓縮
cf.channel().writeAndFlush(req);
}
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response) msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.4 長連線/短連線
1.長連線, 一致保持著連線不主動中斷, 實時性強
2.短連線. 資料放在快取, 一次性批量提交所有資料, 服務端接收後即關閉連線
以上兩種根據是否給ChannelHandlerContext新增ChannelFutureListener.ClOSE監聽器實現
3.長連線, 一定時間不活躍則關閉連線. 給SocketChannel新增ReadTimeoutHandler實現. 例項如下:
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
public class Request implements Serializable{
private String id ;
private String name ;
private String requestMessage ;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
}
public class Response implements Serializable{
private String id;
private String name;
private String responseMessage;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
}
}
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//設定日誌
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 時限, 讀客戶端超時沒資料則斷開
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage("響應內容" + request.getId());
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class Client {
private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;
// 單例
private static class SingletonHolder {
static final Client instance = new Client();
}
public static Client getInstance(){
return SingletonHolder.instance;
}
private Client(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//超時handler(當伺服器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉通道)
sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 時限5s, 讀服務端超時沒資料則斷開
sc.pipeline().addLast(new ClientHandler());
}
});
}
public void connect(){
try {
this.cf = b.connect("127.0.0.1", 8765).sync();
System.out.println("遠端伺服器已經連線, 可以進行資料交換");
} catch (Exception e) {
e.printStackTrace();
}
}
public ChannelFuture getChannelFuture(){
if(this.cf == null) { //初次連線
this.connect();
}
if(!this.cf.channel().isActive()){ //重連
this.connect();
}
return this.cf;
}
public static void main(String[] args) throws Exception{
final Client c = Client.getInstance();
ChannelFuture cf = c.getChannelFuture();
for(int i = 1; i <= 3; i++ ){
Request request = new Request();
request.setId("" + i);
request.setName("request" + i);
request.setRequestMessage("資料資訊" + i);
cf.channel().writeAndFlush(request);
TimeUnit.SECONDS.sleep(4); //間隔4s傳送一次資料
}
cf.channel().closeFuture().sync(); //阻塞至超時關閉
// 這裡用子執行緒重連併發送資料一次
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入子執行緒重連一次");
ChannelFuture cf = c.getChannelFuture();
assert true == cf.channel().isActive(); //斷言
//再次傳送資料
Request request = new Request();
request.setId("" + 4);
request.setName("request" + 4);
request.setRequestMessage("資料資訊" + 4);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println("子執行緒完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
System.out.println("斷開連線,主執行緒結束..");
}
}
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response) msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.5 使用UDP (較少使用)
public class Server {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class) // UDP: NioDatagramChannel
.option(ChannelOption.SO_BROADCAST, true) // 廣播
.handler(new ServerHandler());
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new Server().run(8765);
}
}
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
// 諺語列表
private static final String[] DICTIONARY = {
"只要功夫深,鐵棒磨成針。",
"舊時王謝堂前燕,飛入尋常百姓家。",
"洛陽親友如相問,一片冰心在玉壺。",
"一寸光陰一寸金,寸金難買寸光陰。",
"老驥伏櫪,志在千里。烈士暮年,壯心不已!"
};
private String nextQuote() {
int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
return DICTIONARY[quoteId];
}
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String req = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if ("諺語字典查詢?".equals(req)) {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語查詢結果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
cause.printStackTrace();
}
}
public class Client {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ClientHandler());
Channel ch = b.bind(0).sync().channel();
// 向網段內的所有機器廣播UDP訊息
ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語字典查詢?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
if (!ch.closeFuture().await(15000)) {
System.out.println("查詢超時!");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new Client().run(8765);
}
}
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
String response = msg.content().toString(CharsetUtil.UTF_8);
if (response.startsWith("諺語查詢結果: ")) {
System.out.println(response);
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2. 心跳檢測
叢集中主伺服器需要知道從伺服器的狀態
因此client每隔5~10秒給server傳送心跳包
可通過netty與定時任務來實現
public final class MarshallingCodeCFactory {
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
public class RequestInfo implements Serializable {
private String ip ;
private HashMap<String, Object> cpuPercMap ;
private HashMap<String, Object> memoryMap;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
}
public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}
public HashMap<String, Object> getMemoryMap() {
return memoryMap;
}
public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
}
}
public class Server {
public static void main(String[] args) throws Exception{
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//設定日誌
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
private static final String SUCCESS_KEY = "auth_success_key";
static {
AUTH_IP_MAP.put("127.0.0.1", "1234");
}
private boolean auth(ChannelHandlerContext ctx, Object msg){
String [] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if(auth != null && auth.equals(ret[1])){
// 認證成功, 返回確認資訊
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
return false;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
auth(ctx, msg);
} else if (msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println("當前主機ip為: " + info.getIp());
System.out.println("當前主機cpu情況: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("總使用率: " + cpu.get("combined"));
System.out.println("使用者使用率: " + cpu.get("user"));
System.out.println("系統使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空閒率: " + cpu.get("idle"));
System.out.println("當前主機memory情況: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("記憶體總量: " + memory.get("total"));
System.out.println("當前記憶體使用量: " + memory.get("used"));
System.out.println("當前記憶體剩餘量: " + memory.get("free"));
System.out.println("--------------------------------------------");
ctx.writeAndFlush("info received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
}
}
public class Client {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClienHeartBeatHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).sync();
cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClienHeartBeatHandler extends ChannelHandlerAdapter {
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> heartBeat; //定時任務
//主動向伺服器傳送認證資訊
private InetAddress addr ;
private static final String SUCCESS_KEY = "auth_success_key";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
//String ip = addr.getHostAddress();
String ip = "127.0.0.1";
String key = "1234";
//證書
String auth = ip + "," + key;
// 傳送認證
ctx.writeAndFlush(auth);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String){
String ret = (String) msg;
if(SUCCESS_KEY.equals(ret)){
// 收到認證 確認資訊,設定每隔5秒傳送心跳訊息
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS);
System.out.println(msg);
} else {
// 收到心跳包 確認資訊
System.out.println(msg);
}
}
} finally {
// 只讀, 需要手動釋放引用計數
ReferenceCountUtil.release(msg);
}
}
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
RequestInfo info = new RequestInfo();
//ip
info.setIp(addr.getHostAddress());
Sigar sigar = new Sigar();
//cpu prec
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
// memory
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<String, Object>();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info);
} catch (Exception e) {
e.printStackTrace();
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 取消定時傳送心跳包的任務
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
}
3. HTTP
3.1 Hello World
public final class HttpHelloWorldServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpHelloWorldServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to " +
(SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpHelloWorldServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpServerCodec()); // !使用http通訊, HttpRequest和HttpResponse
p.addLast(new HttpHelloWorldServerHandler());
}
}
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter {
private static final byte[] CONTENT = "HELLO WORLD".getBytes();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
if (HttpHeaderUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
boolean keepAlive = HttpHeaderUtil.isKeepAlive(req);
// 構造響應
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT));
response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8");
response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
// Request短連線, 寫完後直接關閉
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
// 長連線, response也設定為KEEP_ALIVE
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.2 HTTP下載檔案
public class HttpDownloadServer {
private static final String DEFAULT_URL = "/sources/";
public void run(final int port, final String url) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// addLast的第一項為key, 自定義的
// request解碼器
ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
// response的編碼器
ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
// chunked, 傳輸檔案時分多個response分解地傳輸檔案
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// ObjectAggregator, 將多個response合併為一個FullHttpResponse
ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
// 自定義業務邏輯handler
ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url));
}
});
ChannelFuture future = b.bind("127.0.0.1", port).sync();
System.out.println("HTTP檔案目錄伺服器啟動,網址是 : " + "http://localhost:" + port + url);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8765;
new HttpDownloadServer().run(port, DEFAULT_URL);
}
}
// 注意這裡繼承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了傳入引數msg的型別
public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String url;
public HttpDownoadServerHandler(String url) {
this.url = url;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
//是否能理解(解碼)請求
if (!request.decoderResult().isSuccess()) {
// 400
sendError(ctx, BAD_REQUEST);
return;
}
//對請求的方法進行判斷:如果不是GET方法則返回異常
if (request.method() != GET) {
// 405
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
//獲取請求uri路徑
final String uri = request.uri();
//對url進行分析,返回本地路徑
final String path = parseURI(uri);
//如果 路徑構造不合法,則path為null
if (path == null) {
//403
sendError(ctx, FORBIDDEN);
return;
}
// 建立file物件
File file = new File(path);
// 檔案隱藏或不存在
if (file.isHidden() || !file.exists()) {
// 404
sendError(ctx, NOT_FOUND);
return;
}
// 是資料夾
if (file.isDirectory()) {
if (uri.endsWith("/")) {
//如果以正常"/"結束 說明是訪問的一個檔案目錄:則進行展示檔案列表
sendListing(ctx, file);
} else {
//如果非"/"結束 則重定向,讓客戶端補全"/"並再次請求
sendRedirect(ctx, uri + '/');
}
return;
}
// 如果所建立的file物件不是檔案型別
if (!file.isFile()) {
// 403
sendError(ctx, FORBIDDEN);
return;
}
//隨機檔案讀寫物件
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(file, "r");// 以只讀的方式開啟檔案
} catch (FileNotFoundException fnfe) {
// 404
sendError(ctx, NOT_FOUND);
return;
}
//獲取檔案長度
long fileLength = randomAccessFile.length();
//建立響應物件
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
//設定響應資訊
HttpHeaderUtil.setContentLength(response, fileLength);
//設定Content-Type
setContentTypeHeader(response, file);
//設定為KeepAlive
if (HttpHeaderUtil.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
//輸出response header, HttpObjectAggregator能將其與下面輸出整合合併
ctx.write(response);
//寫出ChunkedFile. 建立ChunkedFile需要使用RandomAccessFile並設定分段. 這裡每次傳輸8192個位元組
ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
//新增傳輸監聽
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) {
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("Transfer complete.");
}
});
//使用Chunked, 完成時需要傳送標記結束的空訊息體!
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
//如果當前連線請求非Keep-Alive, 最後一包訊息傳送完後, 伺服器主動關閉連線
if (!HttpHeaderUtil.isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (ctx.channel().isActive()) {
// 500
sendError(ctx, INTERNAL_SERVER_ERROR);
ctx.close();
}
}
//判斷非法URI的正則
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
private String parseURI(String uri) {
try {
//使用UTF-8字符集
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
//嘗試ISO-8859-1
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
//丟擲預想外異常資訊
throw new Error();
}
}
// 對uri進行細粒度判斷:4步驗證操作
// step 1 基礎驗證
if (!uri.startsWith(url)) {
return null;
}
// step 2 基礎驗證
if (!uri.startsWith("/")) {
return null;
}
// step 3 將檔案分隔符替換為本地作業系統的檔案路徑分隔符
uri = uri.replace('/', File.separatorChar);
// step 4 驗證路徑合法性
if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) ||
uri.startsWith(".") || uri.endsWith(".") ||
INSECURE_URI.matcher(uri).matches()) {
return null;
}
//利用當前工程所在目錄 + URI相對路徑 構造絕對路徑
return System.getProperty("user.dir") + File.separator + uri;
}
//用正則表示式過濾檔名
private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
//檔案列表, 拼html檔案
private static void sendListing(ChannelHandlerContext ctx, File dir) {
// 設定響應物件
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
// 響應頭
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
// 構造文字內容
StringBuilder ret = new StringBuilder();
String dirPath = dir.getPath();
ret.append("<!DOCTYPE html>\r\n");
ret.append("<html><head><title>");
ret.append(dirPath);
ret.append(" 目錄:");
ret.append("</title></head><body>\r\n");
ret.append("<h3>");
ret.append(dirPath).append(" 目錄:");
ret.append("</h3>\r\n");
ret.append("<ul>");
ret.append("<li>連結:<a href=\"../\">..</a></li>\r\n");
// 遍歷檔案, 生成超連結
for (File f : dir.listFiles()) {
//step 1: 跳過隱藏檔案和不可讀檔案
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
//step 2: 跳過正則過濾的檔名
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}
ret.append("<li>連結:<a href=\"");
ret.append(name);
ret.append("\">");
ret.append(name);
ret.append("</a></li>\r\n");
}
ret.append("</ul></body></html>\r\n");
//構造ByteBuf,寫入緩衝區
ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8);
//進行寫出操作
response.content().writeBytes(buffer);
//重置ByteBuf
buffer.release();
//傳送完成並主動關閉連線
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//重定向操作
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(LOCATION, newUri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//錯誤資訊
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void setContentTypeHeader(HttpResponse response, File file) {
//使用mime物件獲取檔案對應的Content-Type
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
3.3 HTTP上傳檔案 (較少使用)
實際應用中檔案上傳服務端有成熟的框架fastDFS(小檔案)和HDFS(大檔案)
如要實現斷點續傳, 需要記錄上傳進度. 參考HTTP頭的Range和Content-Range
public final class HttpUploadServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey());
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.handler(new LoggingHandler(LogLevel.INFO));
b.childHandler(new HttpUploadServerInitializer(sslCtx));
Channel ch = b.bind(PORT