1. 程式人生 > >spring boot 使用 CommandLineRunner 啟動netty做 RPC

spring boot 使用 CommandLineRunner 啟動netty做 RPC

(1) 啟動是重寫 CommandLineRunner
(2) 寫NettyServerListener Component
(3) 實現 TdgisnettyApplication run 方法
主要實現在SpringApplication 啟動後啟動NettyServer
下面是程式碼
@Component

public class NettyServerListener {
    private static final Logger LOGGER = LoggerFactory.getLogger( NettyServerListener.class );
    /**
     * 建立bootstrap
     */
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    /**
     * BOSS
     */
    EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * Worker
     */
    EventLoopGroup work = new NioEventLoopGroup();
    /**
     * 通道介面卡
     */
    @Resource
    private ServerChannelHandlerAdapter channelHandlerAdapter;
    @Resource
    private NettyConfig nettyConfig;
    @PreDestroy
    public void close() {
        LOGGER.info("關閉伺服器....");
        //優雅退出
        boss.shutdownGracefully();
        work.shutdownGracefully();
    }

    /**
     * 開啟及服務執行緒
     */
    public void start() {
        // 從配置檔案中(application.yml)獲取服務端監聽埠號
        int port = nettyConfig.getPort();
        serverBootstrap.group(boss, work)
                .channel(NioServerSocketChannel.class)
                .option( ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler( LogLevel.INFO));
        try {
            //設定事件處理
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
                            , 0, 2, 0, 2));
                    pipeline.addLast(new LengthFieldPrepender(2));
                    pipeline.addLast(new ObjectCodec());

                    pipeline.addLast(channelHandlerAdapter);
                }
            });
            LOGGER.info("netty伺服器在[{}]埠啟動監聽", port);
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            LOGGER.info("[出現異常] 釋放資源");
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
//ChannelHandlerAdapter 負責資料進入並在ChannelPipeline中按照從上至下的順序查詢呼叫相應的BoundHandler
@Component
@ChannelHandler.Sharable
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
    private  Logger logger = LoggerFactory.getLogger(  ServerChannelHandlerAdapter.class);
    @Resource
    private RequestDispatcher dispatcher;
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
        // 遮蔽toString()方法
        if (invokeMeta.getMethodName().endsWith("toString()")
                && !"class java.lang.String".equals(invokeMeta.getReturnType().toString()))
            logger.info("客戶端傳入引數 :{},返回值:{}",
                    invokeMeta.getArgs(), invokeMeta.getReturnType());
        dispatcher.dispatcher(ctx, invokeMeta);
    }
}
//通過執行緒池來處理多使用者請求的訊息,回覆訊息
@Component
public class RequestDispatcher implements ApplicationContextAware {
    private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
    private ApplicationContext app;
    public void dispatcher(final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
        executorService.submit(() -> {
            ChannelFuture f = null;
            try {
                Class<?> interfaceClass = invokeMeta.getInterfaceClass();
                String name = invokeMeta.getMethodName();
                Object[] args = invokeMeta.getArgs();
                Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
                Object targetObject = app.getBean(interfaceClass);
                Method method = targetObject.getClass().getMethod(name, parameterTypes);
                Object obj = method.invoke(targetObject, args);
                if (obj == null) {
                    f = ctx.writeAndFlush(NullWritable.nullWritable());
                } else {
                    f = ctx.writeAndFlush(obj);
                }
                f.addListener( ChannelFutureListener.CLOSE);
            } catch (Exception e) {
                ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
                f = ctx.writeAndFlush(error);
            } finally {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        });
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.app = applicationContext;
    }
}

//rpc 呼叫的處理類 MethodInvokeMeta

@Component
public class MethodInvokeMeta implements Serializable {
    private static final long serialVersionUID = -3991505734959724273L;
    //介面
    private Class<?> interfaceClass;
    //方法名
    private String methodName;
    //引數
    private Object[] args;
    //返回值型別
    private Class<?> returnType;
    //引數型別
    private Class<?>[] parameterTypes;

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public Class<?> getInterfaceClass() {
        return interfaceClass;
    }

    public void setInterfaceClass(Class<?> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Class getReturnType() {
        return returnType;
    }

    public void setReturnType(Class returnType) {
        this.returnType = returnType;
    }
}
  //空的異常處理
    @Component
	public class NullWritable implements Serializable {
    private static final long serialVersionUID = 1857236694251465532L;
    private static NullWritable instance = new NullWritable();

    private NullWritable() {
    }

    public static NullWritable nullWritable() {
        return instance;
    }
}
// 訊息序列化處理
public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, List<Object> list) throws Exception {
        byte[] data = ObjectSerializerUtils.serilizer(o);
        ByteBuf buf = Unpooled.buffer();
        buf.writeBytes(data);
        list.add(buf);
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
        list.add(deSerilizer);
    }
}

public class ObjectSerializerUtils {
private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils.class);

/**
 * 反序列化
 *
 * @param data
 * @return
 */
public static Object deSerilizer(byte[] data) {
    if (data != null && data.length > 0) {
        try {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream ois = new ObjectInputStream(bis);
            return ois.readObject();
        } catch (Exception e) {
            logger.info("[異常資訊] {}", e.getMessage());
            e.printStackTrace();
        }
        return null;
    } else {
        logger.info("[反序列化] 入參為空");
        return null;
    }
}

/**
 * 序列化物件
 *
 * @param obj
 * @return
 */
public static byte[] serilizer(Object obj) {
    if (obj != null) {
        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
            oos.close();
            return bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    } else {
        return null;
    }
    }
}