netty4 服務端同步客戶端返回的結果
阿新 • • 發佈:2018-12-14
netty是一個非同步通訊框架,在有的時候我們想使用服務端向客戶端傳送訊息,服務端同步等待客戶端返回結果然進行下一步的業務邏輯操作。那要怎麼做才能同步獲取客戶端返回的資料呢?這裡我用到了JDK中的閉鎖等待 CountDownLatch,接下來看看程式碼如何實現:
服務端:
package com.example.demo.server; import com.example.demo.cache.ChannelMap; import com.example.demo.model.Result; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import lombok.extern.slf4j.Slf4j; /** * @ClassName: NettyServer * @Author: huangzf * @Date: 2018/9/25 15:40 * @Description: */ @Slf4j public class NettyServer { private NettyServerChannelInitializer serverChannelInitializer = null; private int port = 8000; public void bind() throws Exception { //配置服務端的NIO執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { serverChannelInitializer = new NettyServerChannelInitializer(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //保持長連線 .childOption(ChannelOption.SO_KEEPALIVE,true) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(serverChannelInitializer); //繫結埠,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待伺服器監聽埠關閉 f.channel().closeFuture().sync(); } finally { //釋放執行緒池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public Result write(Object obj, String tenantId ,String uniId) throws Exception { // 獲取鎖 Lock lock = ChannelMap.getChannelLock(tenantId); try { Channel channel = ChannelMap.getChannel(tenantId); if(channel != null){ lock.lock(); if(channel.isOpen()){ // 設定同步 CountDownLatch latch = new CountDownLatch(1); NettyServerHandler nettyServerHandler = (NettyServerHandler) channel.pipeline().get("handler"); nettyServerHandler.resetSync(latch,1); nettyServerHandler.setUnidId(uniId); channel.writeAndFlush(obj ); //同步返回結果 if (latch.await(60,TimeUnit.SECONDS)){ // printerServerHandler.setTimeout(0); return nettyServerHandler.getResult(); } //如果超時,將超時標誌設定為1 //printerServerHandler.setTimeout(1); log.error("請求超時60s"); return new Result(2,"請求超時",null); }else{ return new Result(0,"客戶端已關閉!",null); } } }catch (Exception e){ e.printStackTrace(); return new Result(0,"服務出錯!",null); }finally { if (lock != null){ lock.unlock(); } } return new Result(0,"客戶端沒有連線!",null); } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }
程式碼中write方法是業務程式碼呼叫服務端向客戶端傳送資訊的統一入口,這裡用了Lock是為了防止併發操作影響資料返回的問題,這裡每個客戶端通道分配一個鎖。latch.await(60,TimeUnit.SECONDS) 是為了阻塞程式,等待客戶端返回結果,如果60s內沒有返回結果則釋放鎖並返回請求超時。
服務端NettyServerChannelInitializer 的實現
package com.example.demo.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * @ClassName: NettyServerChannelInitializer * @Author: huangzf * @Date: 2018/9/25 15:43 * @Description: */ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> { private NettyServerHandler handler ; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers .weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast(new IdleStateHandler(40,0,0,TimeUnit.SECONDS)); //伺服器的邏輯 handler = new NettyServerHandler(); pipeline.addLast("handler", handler); } }
這裡使用了物件進行資料傳輸,避免了客戶端重新解析組裝物件的麻煩
package com.example.demo.server; import com.example.demo.cache.ChannelMap; import com.example.demo.model.Result; import com.example.demo.model.Tenant; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; /** * @ClassName: NettyServerHandler * @Author: huangzf * @Date: 2018/9/25 15:44 * @Description: */ @Slf4j public class NettyServerHandler extends SimpleChannelInboundHandler { private CountDownLatch latch; /** * 訊息的唯一ID */ private String unidId = ""; /** * 同步標誌 */ private int rec; /** * 客戶端返回的結果 */ private Result result; /** * 心跳丟失次數 */ private int counter = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client say : " + msg.toString()); if(msg instanceof Tenant){ ChannelMap.setChannel(((Tenant) msg).getTenantId(),ctx.channel()); ChannelMap.setChannelLock(((Tenant) msg).getTenantId(),new ReentrantLock()); } counter = 0; if(rec == 1 && msg instanceof Result){ Result re = (Result) msg; //校驗返回的資訊是否是同一個資訊 if (unidId.equals(re.getUniId())){ latch.countDown();//訊息返回完畢,釋放同步鎖,具體業務需要判斷指令是否匹配 rec = 0; result = re; } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("RemoteAddress : " + ctx.channel().remoteAddress().toString()+ " active !"); super.channelActive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)){ // 空閒40s之後觸發 (心跳包丟失) if (counter >= 3) { // 連續丟失3個心跳包 (斷開連線) ctx.channel().close().sync(); log.error("已與"+ctx.channel().remoteAddress()+"斷開連線"); System.out.println("已與"+ctx.channel().remoteAddress()+"斷開連線"); } else { counter++; log.debug(ctx.channel().remoteAddress() + "丟失了第 " + counter + " 個心跳包"); System.out.println("丟失了第 " + counter + " 個心跳包"); } } } } public void resetSync(CountDownLatch latch, int rec) { this.latch = latch; this.rec = rec; } public void setUnidId(String s){ this.unidId = s; } public Result getResult() { return result; } }
在channelRead0方法中 如果讀取到的資訊是Tenant (客戶端剛連線上傳送的訊息)就為該客戶端關聯一個唯一標誌和分配一個鎖Lock(用於併發操作)
如果讀取到的資訊是Result(客戶端響服務端的訊息)就判斷其是否是同一個訊息(服務端傳送的訊息中帶有該訊息的唯一id,客戶端返回時也要帶上該id),如果是就latch.countDown() 釋放同步鎖,這樣就可以使得服務端同步得到客戶端返回的訊息了。
詳情與客戶端程式碼請移步碼雲: