spring boot 使用 CommandLineRunner 啟動netty做 RPC
阿新 • • 發佈:2018-12-29
(1) 啟動是重寫 CommandLineRunner
(2) 寫NettyServerListener Component
(3) 實現 TdgisnettyApplication run 方法
主要實現在SpringApplication 啟動後啟動NettyServer
下面是程式碼
@Component
public class NettyServerListener { private static final Logger LOGGER = LoggerFactory.getLogger( NettyServerListener.class ); /** * 建立bootstrap */ ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * BOSS */ EventLoopGroup boss = new NioEventLoopGroup(); /** * Worker */ EventLoopGroup work = new NioEventLoopGroup(); /** * 通道介面卡 */ @Resource private ServerChannelHandlerAdapter channelHandlerAdapter; @Resource private NettyConfig nettyConfig; @PreDestroy public void close() { LOGGER.info("關閉伺服器...."); //優雅退出 boss.shutdownGracefully(); work.shutdownGracefully(); } /** * 開啟及服務執行緒 */ public void start() { // 從配置檔案中(application.yml)獲取服務端監聽埠號 int port = nettyConfig.getPort(); serverBootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .option( ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler( LogLevel.INFO)); try { //設定事件處理 serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength() , 0, 2, 0, 2)); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new ObjectCodec()); pipeline.addLast(channelHandlerAdapter); } }); LOGGER.info("netty伺服器在[{}]埠啟動監聽", port); ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.info("[出現異常] 釋放資源"); boss.shutdownGracefully(); work.shutdownGracefully(); } } } //ChannelHandlerAdapter 負責資料進入並在ChannelPipeline中按照從上至下的順序查詢呼叫相應的BoundHandler @Component @ChannelHandler.Sharable public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter { private Logger logger = LoggerFactory.getLogger( ServerChannelHandlerAdapter.class); @Resource private RequestDispatcher dispatcher; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg; // 遮蔽toString()方法 if (invokeMeta.getMethodName().endsWith("toString()") && !"class java.lang.String".equals(invokeMeta.getReturnType().toString())) logger.info("客戶端傳入引數 :{},返回值:{}", invokeMeta.getArgs(), invokeMeta.getReturnType()); dispatcher.dispatcher(ctx, invokeMeta); } } //通過執行緒池來處理多使用者請求的訊息,回覆訊息 @Component public class RequestDispatcher implements ApplicationContextAware { private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads()); private ApplicationContext app; public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) { executorService.submit(() -> { ChannelFuture f = null; try { Class<?> interfaceClass = invokeMeta.getInterfaceClass(); String name = invokeMeta.getMethodName(); Object[] args = invokeMeta.getArgs(); Class<?>[] parameterTypes = invokeMeta.getParameterTypes(); Object targetObject = app.getBean(interfaceClass); Method method = targetObject.getClass().getMethod(name, parameterTypes); Object obj = method.invoke(targetObject, args); if (obj == null) { f = ctx.writeAndFlush(NullWritable.nullWritable()); } else { f = ctx.writeAndFlush(obj); } f.addListener( ChannelFutureListener.CLOSE); } catch (Exception e) { ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR); f = ctx.writeAndFlush(error); } finally { f.addListener(ChannelFutureListener.CLOSE); } }); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.app = applicationContext; } }
//rpc 呼叫的處理類 MethodInvokeMeta
@Component public class MethodInvokeMeta implements Serializable { private static final long serialVersionUID = -3991505734959724273L; //介面 private Class<?> interfaceClass; //方法名 private String methodName; //引數 private Object[] args; //返回值型別 private Class<?> returnType; //引數型別 private Class<?>[] parameterTypes; public Object[] getArgs() { return args; } public void setArgs(Object[] args) { this.args = args; } public Class<?> getInterfaceClass() { return interfaceClass; } public void setInterfaceClass(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Class getReturnType() { return returnType; } public void setReturnType(Class returnType) { this.returnType = returnType; } } //空的異常處理 @Component public class NullWritable implements Serializable { private static final long serialVersionUID = 1857236694251465532L; private static NullWritable instance = new NullWritable(); private NullWritable() { } public static NullWritable nullWritable() { return instance; } } // 訊息序列化處理 public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception { byte[] data = ObjectSerializerUtils.serilizer(o); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data); list.add(buf); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes); list.add(deSerilizer); } }
public class ObjectSerializerUtils {
private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils.class);
/** * 反序列化 * * @param data * @return */ public static Object deSerilizer(byte[] data) { if (data != null && data.length > 0) { try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bis); return ois.readObject(); } catch (Exception e) { logger.info("[異常資訊] {}", e.getMessage()); e.printStackTrace(); } return null; } else { logger.info("[反序列化] 入參為空"); return null; } } /** * 序列化物件 * * @param obj * @return */ public static byte[] serilizer(Object obj) { if (obj != null) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); oos.close(); return bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return null; } else { return null; } } }