淺談高可用架構中NIO的重要性
一個功能引發的思考
今天同事開發了一個檔案讀寫的模組,發現讀寫效能異常的低,他的做法是單執行緒純IO操作,頻繁的開啟關閉IO流,讀寫。
於是乎他問我這個應該怎麼做,我給他講解到這種做法的低效,建議他批量的一次性寫入,頻繁直接操作IO效能當然是無法接受的。
再談IO操作的演變
BIO:傳統的cs端架構,都是一個請求提交,後臺一個專門的執行緒負責接受這個請求,分配給新的執行緒去處理。這種做法的缺點很明顯,當併發量過大的時候,不斷的建立新的執行緒,對於伺服器來說,肯定是無法接受的,這樣會導致大量的請求阻塞未響應。而且建立執行緒這個操作java是直接操作 作業系統核心的,光建立執行緒這個,就會導致系統假死,cpu100等情況。
NIO:直到jdk1.4以後,才推出了非阻塞的IO操作,基於I/O多路複用技術,減小系統開銷。即,把多個I/O的阻塞複用到同一個select上阻塞,從而使得單執行緒情況下,可以同時處理多個請求。系統不需要建立新的額外程序或者執行緒,也不需要維護這些程序和執行緒的執行。並且在jdk1.5中,epool替代傳統select/poll,極大提升NIO通訊新能
jdk nio存在的缺陷
1.開發複雜度非常高,例如:客戶端重連,網路閃斷,半包讀寫,失敗快取,網路擁塞,異常碼流的處理等。
2.jdk epoll bug會導致selector空輪詢,最終導致cpu100
基於Netty實現NIO
Netty算是一個比較成熟的nio框架了,他解決了jdk中epoll的bug,並且開發複雜度也比較低一點。
後面上一個簡單的netty nio實現原始碼
基於Netty開發應用層協議
隨著網際網路發展,傳統垂直架構逐漸被分散式,彈性可伸縮分散式架構替代。
那麼問題也隨之而出,系統只有分散式,就面臨各個節點間通訊的問題,尤為強調高可用,擴充套件性強,高效能。
1.高效能
傳統java序列化效能比較低,而且序列化後的位元組流太大,現在出的很多新的序列化框架,效能都比傳統的jdk要高很多,如:google的protobuf facebook的thrift jboss的marshalling。另外netty對非同步非阻塞通訊的支援,以及高效reactor執行緒模型,無鎖化序列設計,0拷貝,靈活tcp引數配置等。
2.擴充套件性
傳統java序列化無法跨語言,所以目前的java rpc框架基本也都沒用jdk的,比如thirft:可以跨c++,c#,cocoa,erlang,java,perl,php,python,ruby等
3.高可用
傳統bio模型,對併發訪問支援很差,而新的nio就不同,具體的上面也談到了
4.可靠性
1>網路通訊類
連線超時介面,強制關閉對端連線
2>鏈路有效性
通過心跳檢查
3>reactor執行緒保護
主要謹慎處理I/O異常,以及規避NIO bug
4>鏈路控制
5>優雅停機
5.安全性
安全性可以通過netty提供的SSL認證,也可以通過第三方CA認證來保障
6.成功案例
目前也已經有很多成熟應用netty的框架,如alibaba的rocketMQ dubbo ,Apache的avro等等
副一段Netty基礎NIO程式碼
package com.solace.nio;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Created by Administrator on 2018/4/10.
*
* @author solace
*/
public class TimerServer {
public void bind(int port) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
//繫結埠,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimerServer().bind(8088);
}
}
package com.solace.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.logging.Logger;
/**
* Created by Administrator on 2018/4/10.
*
* @author solace
*/
public class TimeClientHandler extends ChannelHandlerAdapter{
public static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
private final ByteBuf fisrtMessage;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
fisrtMessage = Unpooled.buffer(req.length);
fisrtMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(fisrtMessage);
}
/**
* 解碼讀取訊息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is :"+body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warning("Unexpercted exception from downstream :"+cause.getMessage());
ctx.close();
}
}
package com.solace.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
/**
* Created by Administrator on 2018/4/10.
*
* @author solace
*/
public class TimeServerHandler extends ChannelHandlerAdapter{
/**
* 讀到訊息後回執
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("the time server receive order:"+body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"bad order";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
/**
* 訊息讀完了後flush
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 訊息讀取異常處理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
package com.solace.nio;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by Administrator on 2018/4/10.
*
* @author solace
*/
public class TimeClient {
public void connect(int port,String host) throws InterruptedException {
//配置NIO執行緒租
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//非同步發起連線請求
ChannelFuture future = bootstrap.connect(host,port).sync();
//等待客戶端鏈路關閉
future.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new TimeClient().connect(8088,"127.0.0.1");
}
}
package com.solace.nio;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.util.logging.SocketHandler;
/**
* Created by Administrator on 2018/4/10.
*
* @author solace
*/
public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
/**
* 新增自定義處理類
* @param channel
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new TimeServerHandler());
}
}