suging閒談-netty 的非同步非阻塞IO執行緒與業務執行緒分離
前言
surging 對外沉寂了一段時間了,但是作者並沒有閒著,而是針對於客戶的需要添加了不少功能,也給我帶來了不少外快收益, 就比如協議轉化,consul 的watcher 機制,JAVA版本,skywalking 升級支援8.0,.升級NET 6.0 ,而客戶自己擴充套件支援服務編排流程引擎,後期客戶還需要擴充套件定製coap ,XMPP等協議。而今天寫這篇文章的目的針對於修改基於netty 的非同步非阻塞業務邏輯操作
問題描述
年前客戶把JAVA版本進行了測試,產生了不少問題,客戶也比較茫然,因為有記憶體洩漏,通過jmeter壓測,併發始終上不來,通過半個月的努力,終於把問題解決了,預估JAVA版本併發能達到2萬左右,以下是客戶通過設定jmeter壓測例項
解決方案
當客戶把問題拋給我後,我第一反應是IO執行緒被阻塞造成的,而這樣就可以把問題定位在netty 的處理上,而處理server 端程式碼是NettyServerMessageListener,而其中ServerHandler的channelRead是處理業務邏輯的,在這當中我是通過ThreadPoolExecutor執行非同步處理,可以看看NettyServerMessageListener程式碼:
public class NettyServerMessageListener implements IMessageListener { private Thread thread;private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class); private ChannelFuture channel; private final ITransportMessageDecoder transportMessageDecoder; private final ITransportMessageEncoder transportMessageEncoder; ReceivedDelegate Received = newReceivedDelegate(); @Inject public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory) { this.transportMessageEncoder = codecFactory.GetEncoder(); this.transportMessageDecoder = codecFactory.GetDecoder(); } public void StartAsync(final String serverAddress) { thread = new Thread(new Runnable() { int parallel = Runtime.getRuntime().availableProcessors(); final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(parallel); ThreadFactory threadFactory = new DefaultThreadFactory("rpc-netty", true); public void run() { String[] array = serverAddress.split(":"); logger.debug("準備啟動服務主機,監聽地址:" + array[0] + "" + array[1] + "。"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(parallel,threadFactory); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true).childOption(ChannelOption.TCP_NODELAY, true).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LengthFieldPrepender(4)) .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) .addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() { @Override public void run() { IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter); onReceived(sender, this.parameter1); } },transportMessageDecoder) ); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { String host = array[0]; int port = Integer.parseInt(array[1]); channel = bootstrap.bind(host, port).sync(); logger.debug("服務主機啟動成功,監聽地址:" + serverAddress + "。"); } catch (Exception e) { if (e instanceof InterruptedException) { logger.info("Rpc server remoting server stop"); } else { logger.error("Rpc server remoting server error", e); } } } }); thread.start(); } @Override public ReceivedDelegate getReceived() { return Received; } public void onReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; Received.notifyX(sender,message); } private class ReadAction<T,T1> implements Runnable { public T parameter; public T1 parameter1; public void setParameter( T tParameter,T1 tParameter1) { parameter = tParameter; parameter1 = tParameter1; } @Override public void run() { } } private class ServerHandler extends ChannelInboundHandlerAdapter { private final DefaultEventLoopGroup serverHandlerPool; private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable; private final ITransportMessageDecoder transportMessageDecoder; public ServerHandler(final DefaultEventLoopGroup threadPoolExecutor, ReadAction<ChannelHandlerContext, TransportMessage> runnable, ITransportMessageDecoder transportMessageDecoder) { this.serverHandlerPool = threadPoolExecutor; this.serverRunnable = runnable; this.transportMessageDecoder = transportMessageDecoder; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.warn("與伺服器:" + ctx.channel().remoteAddress() + "通訊時傳送了錯誤。"); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext context) { context.flush(); } @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf buffer = (ByteBuf) message; try { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); serverHandlerPool.execute(() -> { TransportMessage transportMessage = null; try { transportMessage = transportMessageDecoder.Decode(data); } catch (IOException e) { e.printStackTrace(); } serverRunnable.setParameter(channelHandlerContext, transportMessage); serverRunnable.run(); }); } finally { ReferenceCountUtil.release(message); } } } }
ThreadPoolExecutor程式碼:
public static ThreadPoolExecutor makeServerThreadPool(final String serviceName, int corePoolSize, int maxPoolSize) { ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor( corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 10000)); /* new LinkedBlockingQueue<Runnable>(10000), r -> new Thread(r, "netty-rpc-" + serviceName + "-" + r.hashCode()), new ThreadPoolExecutor.AbortPolicy());*/ return serverHandlerPool; }
後面通過查詢官方的文件發現以下addLast是IO執行緒阻塞呼叫
.addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() {
@Override
public void run() {
IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);
onReceived(sender, this.parameter1);
}
},transportMessageDecoder)
後面通過使用EventExecutorGroup把IO執行緒與業務執行緒進行分離,把耗時業務處理新增到EventExecutorGroup進行處理,首先EventExecutorGroup程式碼如下
public static final EventExecutorGroup execThreadPool = new DefaultEventExecutorGroup( Runtime.getRuntime().availableProcessors()*2, (ThreadFactory) r -> { Thread thread = new Thread(r); thread.setName("custom-tcp-exec-"+r.hashCode()); return thread; }, 100000, RejectedExecutionHandlers.reject() );
而addLast的ServerHandler添加了EventExecutorGroup, 最新的NettyServerMessageListener程式碼如下:
public class NettyServerMessageListener implements IMessageListener { private Thread thread; private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class); private ChannelFuture channel; private final ITransportMessageDecoder transportMessageDecoder; private final ITransportMessageEncoder transportMessageEncoder; ReceivedDelegate Received = new ReceivedDelegate(); @Inject public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory) { this.transportMessageEncoder = codecFactory.GetEncoder(); this.transportMessageDecoder = codecFactory.GetDecoder(); } public void StartAsync(final String serverAddress) { thread = new Thread(new Runnable() { public void run() { String[] array = serverAddress.split(":"); logger.debug("準備啟動服務主機,監聽地址:" + array[0] + "" + array[1] + "。"); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LengthFieldPrepender(4)) .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) .addLast(ThreadPoolUtil.execThreadPool, "handler",new ServerHandler(new ReadAction<ChannelHandlerContext, TransportMessage>() { @Override public void run() { IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter); onReceived(sender, this.parameter1); } },transportMessageDecoder) ); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { String host = array[0]; int port = Integer.parseInt(array[1]); channel = bootstrap.bind(host, port).sync(); logger.debug("服務主機啟動成功,監聽地址:" + serverAddress + "。"); } catch (Exception e) { if (e instanceof InterruptedException) { logger.info("Rpc server remoting server stop"); } else { logger.error("Rpc server remoting server error", e); } } } }); thread.start(); } @Override public ReceivedDelegate getReceived() { return Received; } public void onReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; Received.notifyX(sender,message); } private class ReadAction<T,T1> implements Runnable { public T parameter; public T1 parameter1; public void setParameter( T tParameter,T1 tParameter1) { parameter = tParameter; parameter1 = tParameter1; } @Override public void run() { } } private class ServerHandler extends ChannelInboundHandlerAdapter { private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable; private final ITransportMessageDecoder transportMessageDecoder; public ServerHandler(ReadAction<ChannelHandlerContext, TransportMessage> runnable, ITransportMessageDecoder transportMessageDecoder) { this.serverRunnable = runnable; this.transportMessageDecoder = transportMessageDecoder; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.warn("與伺服器:" + ctx.channel().remoteAddress() + "通訊時傳送了錯誤。"); ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext context) { context.flush(); } @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf buffer = (ByteBuf) message; try { byte[] data = new byte[buffer.readableBytes()]; buffer.readBytes(data); TransportMessage transportMessage = transportMessageDecoder.Decode(data); serverRunnable.setParameter(channelHandlerContext, transportMessage); serverRunnable.run(); } finally { ReferenceCountUtil.release(message); } } } }
通過以上修改,再通過jmeter壓測已經不會出現timeout 問題,就連stage 閘道器-》.NET微服務-》JAVA微服務都沒有Time out問題產生,jmeter的user thread拉長到2000也沒有出現問題。
通過以上思路把.NET版本的surging 社群版本也進行了修改,已經提交到github,首先把ServiceHost中的serverMessageListener.Received 中的Task.Run移除,ServerHandler中ChannelRead進行移除,然後addLast的ServerHandler添加了EventExecutorGroup.通過以上修改再通過壓測發現可以支援20萬+ ,也未發現記憶體洩漏問題,執行client 1萬次 ,服務端cpu 在6%左右,響應速度在1.1秒左右,可以開啟多個surging 的client 進行壓測,cpu 會疊加上升,響應速度沒有影響,以下是執行1萬次壓測
總結
通過5年研發,surging 從原來的最初的基於netty 的RPC發展到現在可以支援多協議,多語言的異構微服務引擎,不僅是技術的提高,也帶來名利的收益,只要不斷堅持,終究能看到成果,我也會一直更新,為企業和社群使用者帶來自己的綿薄之力,讓企業能更好的掌握微服務解決方案,已解決現在行業各種不同的業務需求。