1. 程式人生 > 實用技巧 >SpringBoot整合Netty實現http服務(類似SpingMvc的contoller層實現)

SpringBoot整合Netty實現http服務(類似SpingMvc的contoller層實現)

SpringBoot中使用Netty與spring中使用Netty沒有差別,在Spring中使用Netty可以考慮Netty的啟動時機,可以在Bean載入的時候啟動,可以寫一個自執行的函式啟動,這裡採用監聽Spring容器的啟動事件來啟動Netty。

實現類似SpingMvc的contoller層實現:

新增依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.1.Final</version>
</dependency>
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>${commons.lang.version}</version>
</dependency>

排除tomcat的依賴

Netty Http服務端編寫:

handler 處理類

@Component
@Slf4j
@ChannelHandler.Sharable //@Sharable 註解用來說明ChannelHandler是否可以在多個channel直接共享使用
public class HttpServerHandler  extends ChannelInboundHandlerAdapter {

    private static Map<String, Action> actionMapping = null;

    public Map<String, Action> getActionMapping(){
        
if(actionMapping == null){ return actionMapping = ClassLoaderUtil.buildActionMapping(); } return actionMapping; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { if (msg instanceof FullHttpRequest) { FullHttpRequest req
= (FullHttpRequest) msg; if (HttpUtil.is100ContinueExpected(req)) { ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); } //封裝請求和響應 HttpRequestWrapper httpRequestWrapper = buildRequestWraper(req); //建造netty響應 HttpResponseWrapper httpResponseWrapper = new HttpResponseWrapper(); Action action = getActionMapping().get(httpRequestWrapper.getUri()); if(action != null){ Object responseBody = null; Object object = action.getMethod().invoke(action.getBean(),buildArgs( action, httpRequestWrapper.getParams(),httpRequestWrapper)); if(StringUtils.isNotEmpty(action.getResponseType()) && action.getResponseType().equals("JSON")){ responseBody = JSON.toJSONString(object); }else{ responseBody = object; } httpResponseWrapper.write(object.toString().getBytes("UTF-8")); } FullHttpResponse response = buildResponse(httpResponseWrapper); boolean keepAlive = HttpUtil.isKeepAlive(req); if (!keepAlive) { ctx.write(response).addListener(ChannelFutureListener.CLOSE); } else { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); ctx.write(response).addListener(ChannelFutureListener.CLOSE); } } //負責顯式釋放與池的ByteBuf例項相關聯的記憶體,SimpleChannelInboundHandler會自動釋放資源,因此無需顯式釋放 ReferenceCountUtil.release(msg); } catch (Exception e) { log.error("system exception:{}", e); } } /** * 構建請求物件 * @param req * @return */ private HttpRequestWrapper buildRequestWraper(FullHttpRequest req) { HashMap<String, String> headersMap = new HashMap<String, String>(16); for (Map.Entry<String, String> entry : req.headers()) { headersMap.put(entry.getKey(), entry.getValue()); } byte[] content = new byte[req.content().readableBytes()]; req.content().readBytes(content); String url = req.uri(); String params = ""; if(url.indexOf("?")>0){ String[] urls = url.split("\\?"); url = urls[0]; params = urls[1]; } return new HttpRequestWrapper(req.method().name(), url, headersMap, content, params); } /** * 構建響應物件 * @param httpResponseWrapper * @return */ private FullHttpResponse buildResponse(HttpResponseWrapper httpResponseWrapper) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(httpResponseWrapper.getStatusCode()), Unpooled.wrappedBuffer(httpResponseWrapper.content())); HttpHeaders headers = response.headers(); headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); headers.set(HttpHeaderNames.CONTENT_TYPE, new AsciiString("application/json; charset=utf-8")); for (Map.Entry<String, String> entry : httpResponseWrapper.headers().entrySet()) { headers.set(entry.getKey(), entry.getValue()); } return response; } /** * 構建請求引數 * @param action * @param urlParams * @param httpRequestWrapper * @return */ public Object[] buildArgs(Action action,String urlParams,HttpRequestWrapper httpRequestWrapper){ if(action == null){ return null; } LocalVariableTableParameterNameDiscoverer u = new LocalVariableTableParameterNameDiscoverer(); //獲取處理方法的引數 String[] params = u.getParameterNames(action.getMethod()); Object[] objects = new Object[params.length]; Map<String,String> paramMap = new HashMap<>(); try{ if(StringUtils.isNotEmpty(urlParams)){ paramMap = UrlUtils.URLRequest(urlParams); } if( httpRequestWrapper.content()!=null){ Parameter[] parameters = action.getMethod().getParameters(); for(Parameter parameter : parameters){ Annotation annotation = parameter.getAnnotation(ActionBody.class); if(annotation == null){ continue; } int index = Integer.parseInt(parameter.getName().substring(3)); paramMap.put(params[index],new String(httpRequestWrapper.content(),"UTF-8")); } } for( int i = 0 ;i<params.length; i++){ final int flag = i; paramMap.forEach((k,v)->{ if(k.equals(params[flag])){ objects[flag] = v; } }); } }catch(Exception e){ log.error(e.getMessage()); } return objects; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 當客戶端斷開連線 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.close(); } }

controller層實現:

@Component
@ActionMapping(actionKey="controller")
@ResponseType
public class HttpNettyController implements BaseActionController{

    @ActionMapping(actionKey = "method")
    public String method(String a, String b){

        return String.format("a:%s,b:%s",a,b);
    }
}

ChannelPipeline 實現:

@Component
@ConditionalOnProperty(  //配置檔案屬性是否為true
        value = {"netty.http.enabled"},
        matchIfMissing = false
)
public class HttpPipeline extends ChannelInitializer<SocketChannel> {

    @Autowired
    HttpServerHandler httpServerHandler;


    @Override
    public void initChannel(SocketChannel ch) {
    //    log.error("test", this);
        ChannelPipeline p = ch.pipeline();        
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpContentCompressor());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new ChunkedWriteHandler());
        // http請求根處理器
        p.addLast(httpServerHandler);
    }
    
}

服務實現:

package cn.myframe.netty.server;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import cn.myframe.netty.pipeline.HttpPipeline;
import cn.myframe.properties.NettyHttpProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

/**
 * 
 * HTTP服務
 * @version 建立時間:2019年6月25日 下午5:39:36
 */
@Configuration
@EnableConfigurationProperties({NettyHttpProperties.class})
@ConditionalOnProperty(  //配置檔案屬性是否為true
        value = {"netty.http.enabled"},
        matchIfMissing = false
)
@Slf4j
public class HttpServer {
    
        @Autowired
        HttpPipeline httpPipeline;

        @Autowired
        NettyHttpProperties nettyHttpProperties;

        @Bean("starHttpServer")
        public String start() {
            // 準備配置
            // HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
            // 啟動伺服器
           Thread thread =  new Thread(() -> {
                NioEventLoopGroup bossGroup =
                        new NioEventLoopGroup(nettyHttpProperties.getBossThreads());
                NioEventLoopGroup workerGroup =
                        new NioEventLoopGroup(nettyHttpProperties.getWorkThreads());
                try {
                    log.info("start netty [HTTP] server ,port: " + nettyHttpProperties.getPort());
                    ServerBootstrap boot = new ServerBootstrap();
                    options(boot).group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .handler(new LoggingHandler(LogLevel.INFO))
                            .childHandler(httpPipeline);
                    Channel ch = null;
                  //是否繫結IP
                    if(StringUtils.isNotEmpty(nettyHttpProperties.getBindIp())){
                        ch = boot.bind(nettyHttpProperties.getBindIp(),
                                nettyHttpProperties.getPort()).sync().channel();
                    }else{
                        ch = boot.bind(nettyHttpProperties.getPort()).sync().channel();
                    }
                    ch.closeFuture().sync();
                } catch (InterruptedException e) {
                    log.error("啟動NettyServer錯誤", e);
                } finally {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
            });
            thread.setName("http_Server");
        //    thread.setDaemon(true);
            thread.start();

            return "http start";
        }

        private ServerBootstrap options(ServerBootstrap boot) {
            /*if (HttpConfiguration.me().getSoBacklog() > 0) {
                boot.option(ChannelOption.SO_BACKLOG, HttpConfiguration.me().getSoBacklog());
            }*/
            return boot;
        }
    
}

啟動配置:

---application.yml
spring.profiles.active: http

---application-http.yml
netty:
   http:
     enabled: true
     port: 1999
     bossThreads: 2
     workThreads: 4

測試: