1. 程式人生 > 其它 >使用Netty手擼一個簡單的閘道器

使用Netty手擼一個簡單的閘道器

一、基礎程式碼

  首先使用BIO單執行緒、BIO多執行緒、BIO執行緒池的方式啟動埠為8080、8081、8082三個服務,為後續做準備,同時編寫客戶端呼叫,使用HttpClient和OkHttpClient來分別呼叫。

  1、BIO模式單執行緒模式

public class HttpServer01 {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        while (true){
            
try{ Socket socket = serverSocket.accept(); service(socket); }catch (Exception e){ e.printStackTrace(); } } } private static void service(Socket socket) { String body = "hello lcl-nio-001"; try{ PrintWriter printWriter
= new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); printWriter.flush(); printWriter.close(); socket.close(); }
catch (Exception e){ } }

  2、多執行緒模式

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8081);
        while (true){
            try{
                Socket socket = serverSocket.accept();
                new Thread(() -> {
                    service(socket);
                }).start();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

  3、執行緒池模式

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8082);
        ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4);
        while (true){
            try{
                Socket socket = serverSocket.accept();
                executorService.submit(()->service(socket));
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

  4、HttpClient

 Request request = new Request.Builder()
                   .url(url)
                   .get()
                   .build();
        System.out.println(url);
        final Call call = client.newCall(request);
        Response response = call.execute();
        String result = response.body().string();

  5、OkHttpClient

     HttpGet request = new HttpGet(url);
        try (CloseableHttpClient httpClient = HttpClients.createDefault();
             CloseableHttpResponse response = httpClient.execute(request)) {
            String result = EntityUtils.toString(response.getEntity());
            //System.out.println(url);
             System.out.println(result);
             return result;
        }

 二、閘道器 V1.0:實現簡單請求轉發

  閘道器的核心功能就是轉發、攔截過濾、路由,同時要保證高效能,那麼閘道器 V1.0 首先關注點在於使用Netty實現訊息的轉發。

   1、首先建立一個Handler,根據請求路徑不同,新增不同的返回資料,具體的資料,根據傳入的url地址使用OkHttpClient進行呼叫,使用返回結果拼裝上不同的自定義字串後返回。

  由於是 V1.0 版本,所以先直接取了url集合中的第一個,沒有做路由處理,後面版本會做優化。

public class MyHttpHandler extends ChannelInboundHandlerAdapter {

    private List<String> urlList;

    public MyHttpHandler(List<String> urlList){
        this.urlList = urlList;
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
        String uri = fullHttpRequest.getUri();
        if("/test".equals(uri)){
            handlerTest(ctx, fullHttpRequest, "hello lcl");
        }else {
            handlerTest(ctx, fullHttpRequest, "hello other");
        }
        ctx.fireChannelRead(msg);
    }

    private void handlerTest(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, String body) {
        MyOkHttpClient okHttpClient = new MyOkHttpClient();
        FullHttpResponse response = null;


        try {
            String value = okHttpClient.testHttpGet(this.urlList.get(0)) + body;
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
            response.headers().set("Content-Type", "application/json");
            response.headers().set("Content-Length", response.content().readableBytes());
        } catch (Exception e) {
            System.out.printf("處理異常" + e.getMessage());
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
        }finally {
            if(fullHttpRequest != null){
                if(!HttpUtil.isKeepAlive(fullHttpRequest)){
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                }else {
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE);
                    ctx.write(response);
                }
            }
        }
    }
}

  2、建立ChannelPipeline初始化類,在Pipeline最後新增上上面建立的handler

public class MyHttpInitializer extends ChannelInitializer<SocketChannel> {

    private List<String> urlList;

    public MyHttpInitializer(List<String> urlList){
        this.urlList = urlList;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(1024*1024));
        pipeline.addLast(new MyHttpHandler(urlList));
    }
}

  3、啟動NettyServer

   建立ServerBootstrap,設定相關的引數,將上面建立的初始化類設定到childHander中。

public class MyNettyHttpServer {

    private int port;
    private List<String> urlList;

    public MyNettyHttpServer(int port, List<String> urlList){
        this.port = port;
        this.urlList = urlList;
    }


    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        EventLoopGroup workerGroup = new NioEventLoopGroup(16);


        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    .childOption(ChannelOption.SO_RCVBUF, 21*1024)
                    .childOption(ChannelOption.SO_SNDBUF, 32*1024)
                    .childOption(EpollChannelOption.SO_REUSEPORT, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new MyHttpInitializer(urlList));

            Channel channel = serverBootstrap.bind(port).channel();
            System.out.println("開啟netty http伺服器,監聽地址和埠為 http://127.0.0.1:" + port + '/');
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

  4、主函式啟動時,初始化HttpServer,並設定監聽埠和需要轉發的Url

@SpringBootApplication
public class MyGatewayApplication {

    public static void main(String[] args) {
        int port = 8888;
        String url = "http://localhost:8080,http://localhost:8081";
        SpringApplication.run(MyGatewayApplication.class, args);
        MyNettyHttpServer myNettyHttpServer = new MyNettyHttpServer(port, Arrays.asList(url.split(",")));
        myNettyHttpServer.start();
    }
}

  5、驗證

conglongli@localhost ~ % curl http://localhost:8888/demo
hello lcl-nio-001hello other
conglongli@localhost ~ % curl http://localhost:8888/test
hello lcl-nio-001hello lcl

  至此,一個最簡單的閘道器已經完成。

三、閘道器 V2.0:實現簡單攔截過濾

  在閘道器 V1.0的基礎上,V2.0要加入攔截過濾的功能,主要是兩個方面,一方面是對請求的攔截過濾,另一方面是對返回的攔截過濾。

  1、首先建立對應的請求響應過濾介面,然後各實現一個新增Header的過濾器。

public interface HttpRequestFilter {
    void filter(FullHttpRequest request, ChannelHandlerContext ctx);
}
public interface HttpResponseFilter {
    void filter(FullHttpResponse response);
}
public class HeaderHttpHttpRequestFilter implements HttpRequestFilter {
    @Override
    public void filter(FullHttpRequest request, ChannelHandlerContext ctx) {
        request.headers().set("testkey", "lcl");
        System.out.printf("=========" + request.headers().get("testkey"));
    }
}
public class HeaderHttpHttpResponseFilter implements HttpResponseFilter {
    @Override
    public void filter(FullHttpResponse response) {
        response.headers().set("testkey","ml");
    }
}

  2、建立一個HttpOutboundHandler,在其中主要就是攔截請求,呼叫上面的filter對header做設定,設定完成後才發起http呼叫,呼叫完成後,對於response做過濾,設定頭資訊。

public class HttpOutboundHandler {

    private List<String> urlList;
    HttpResponseFilter responseFilter = new HeaderHttpHttpResponseFilter();

    public HttpOutboundHandler(List<String> urlList){
        this.urlList = urlList;
    }


    public void handler(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, HttpRequestFilter filter){
        filter.filter(fullHttpRequest, ctx);
        MyOkHttpClient okHttpClient = new MyOkHttpClient();
        FullHttpResponse response = null;
        try {
            String value = okHttpClient.testHttpGet(this.urlList.get(0));
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));

            response.headers().set("Content-Type", "application/json");
            response.headers().set("Content-Length", response.content().readableBytes());
            responseFilter.filter(response);
        } catch (Exception e) {
            System.out.printf("處理異常" + e.getMessage());
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
        }finally {
            if(fullHttpRequest != null){
                if(!HttpUtil.isKeepAlive(fullHttpRequest)){
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                }else {
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE);
                    ctx.write(response);
                }
            }
        }
    }
}

   3、修改 V1.0的MyhttpHander,直接呼叫上面的handler方法

public class MyHttpHandler extends ChannelInboundHandlerAdapter {

    private List<String> urlList;

    private HttpOutboundHandler httpOutboundHandler;

    HttpRequestFilter httpRequestFilter = new HeaderHttpHttpRequestFilter();

    public MyHttpHandler(List<String> urlList){
        this.urlList = urlList;
        this.httpOutboundHandler = new HttpOutboundHandler(this.urlList);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
        String uri = fullHttpRequest.getUri();
//        if("/test".equals(uri)){
//            handlerTest(ctx, fullHttpRequest, "hello lcl");
//        }else {
//            handlerTest(ctx, fullHttpRequest, "hello other");
//        }
        httpOutboundHandler.handler(fullHttpRequest, ctx, httpRequestFilter);
        ctx.fireChannelRead(msg);
    }

四、閘道器 V3.0:實現簡單路由

  在 V2.0 中添加了攔截過濾,在 V3.0 中新增路由策略

  1、編寫一個Router介面和實現類,實現路由功能

public interface HttpEndpointRouter {
    String getUrl(List<String> urlList);
}

  實現類可以有多種,例如輪詢、隨機、最小連線數等等,這裡實現一個最簡單的隨機

public class RandomHttpRouter implements HttpEndpointRouter{
    @Override
    public String getUrl(List<String> urlList) {
        Random random = new Random(System.currentTimeMillis());
        int index =  random.nextInt(urlList.size());
        return urlList.get(index);
    }
}

  2、然後修改 V2.0 中獲取url的方法即可

FullHttpResponse response = null;
        try {
            String uri = fullHttpRequest.getUri();
            String url = router.getUrl(this.urlList) + uri;
            String value = okHttpClient.testHttpGet(url);
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));

            response.headers().set("Content-Type", "application/json");
            response.headers().set("Content-Length", response.content().readableBytes());
            responseFilter.filter(response);

  在路由這一塊,還有很多可以優化的,例如是否要判斷ip是否可用等。

 五、高效能優化:使用執行緒池和非同步呼叫

  上面基本上已經涵蓋了轉發、攔截過濾、路由這些最基本的簡單實現,然後就是如何保證閘道器的高效能。

  上面使用的是同步呼叫的方式,那麼為了提高效能,可以使用非同步和執行緒池的方式進行呼叫,同時使用Future獲取返回資訊。

  首先構建執行緒池,在OkHttpOutboundHandler的建構函式中初始化執行緒池

    public OkHttpOutboundHandler(List<String> urlList){
        this.urlList = urlList;

        // 定義執行緒池及引數
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        long keepAliveTime = 1000;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2048);
        ThreadFactory threadFactory = new NamedThreadFactory("proxy-service");
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        proxyServicePool = new ThreadPoolExecutor(corePoolSize,corePoolSize,keepAliveTime,
                TimeUnit.MILLISECONDS, workQueue, threadFactory, handler);
    }

   在其中定義了一個執行緒工廠,主要就是用來生成執行緒,並設定執行緒名稱和是否是守護執行緒。

public class NamedThreadFactory implements ThreadFactory {
    private final String prefixName;
    private final ThreadGroup threadGroup;
    private final boolean demon;
    private final AtomicInteger threadNumber = new AtomicInteger(1);


    public NamedThreadFactory(String prefixName) {
        this(prefixName,false);
    }

    public NamedThreadFactory(String prefixName, boolean demon) {
        this.prefixName = prefixName;
        this.demon = demon;
        SecurityManager securityManager = System.getSecurityManager();
        this.threadGroup = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
    }

    @Override
    public Thread newThread(@NotNull Runnable r) {
        Thread t = new Thread(threadGroup, r,prefixName+"-thread-"+threadNumber.incrementAndGet(), 0);
        t.setDaemon(demon);
        return t;
    }
}

  在handler方法中,對Request進行攔截過濾,根據路由策略選擇一個服務,最後將遠端呼叫人體提交到執行緒池

    public void handler(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, HttpRequestFilter filter){
        filter.filter(fullHttpRequest, ctx);
        String uri = fullHttpRequest.getUri();
        String url = router.getUrl(this.urlList) + uri;
        proxyServicePool.submit(() -> fetchhttpGet(fullHttpRequest, ctx, url));
    }

  在fetchhttpget方法中與之前沒有區別,直接使用遠端呼叫,在呼叫的最後,對Response進行攔截過濾,最後使用ctx.flush()方法將返回結果通知給客戶端。

    private void fetchhttpGet(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, String url) {
        MyOkHttpClient okHttpClient = new MyOkHttpClient();
        FullHttpResponse response = null;
        try {
            String value = okHttpClient.testHttpGet(url);
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));

            response.headers().set("Content-Type", "application/json");
            response.headers().set("Content-Length", response.content().readableBytes());
            responseFilter.filter(response);
        } catch (Exception e) {
            System.out.printf("處理異常" + e.getMessage());
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
        }finally {
            if(fullHttpRequest != null){
                if(!HttpUtil.isKeepAlive(fullHttpRequest)){
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                }else {
                    //response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE);
                    ctx.write(response);
                }
            }
            ctx.flush();
        }
    }

  這樣就完成了通過執行緒池提高併發和效能的處理,但是對於http呼叫來說,仍然是同步的,接下來採用非同步請求客戶端來提高併發效能。

  首先在初始化OkHttpOutboundHandler的建構函式中初始化客戶端

   private CloseableHttpAsyncClient httpClient;
    HttpResponseFilter responseFilter = new HeaderHttpHttpResponseFilter();
    HttpEndpointRouter router = new RandomHttpRouter();

    public OkHttpOutboundHandler(List<String> urlList){
        this.urlList = urlList;

        // 定義執行緒池及引數
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        long keepAliveTime = 1000;
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2048);
        ThreadFactory threadFactory = new NamedThreadFactory("proxy-service");
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        proxyServicePool = new ThreadPoolExecutor(corePoolSize,corePoolSize,keepAliveTime,
                TimeUnit.MILLISECONDS, workQueue, threadFactory, handler);


        IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
                .setConnectTimeout(1000)
                .setSoTimeout(1000)
                .setIoThreadCount(corePoolSize)
                .setRcvBufSize(32*1024).build();

        httpClient = HttpAsyncClients.custom().setMaxConnTotal(40)
                .setMaxConnPerRoute(8)
                .setDefaultIOReactorConfig(ioReactorConfig)
                .setKeepAliveStrategy(((httpResponse, httpContext) -> 6000))
                .build();
        httpClient.start();
    }

   然後在fetchhttpGet方法中使用客戶端的execute方法將請求非同步提交,並且使用Future非同步獲取結果,在結果監聽中,如果呼叫成功則處理呼叫結果

    private void fetchGet(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, String url) {
        final HttpGet httpGet = new HttpGet(url);
        httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
        httpClient.execute(httpGet, new FutureCallback<org.apache.http.HttpResponse>() {
            @Override
            public void completed(org.apache.http.HttpResponse httpResponse) {
                try {
                    handlerResponse(fullHttpRequest, ctx, httpResponse);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
@Override
public void failed(Exception e) { httpGet.abort(); e.printStackTrace(); } @Override public void cancelled() { httpGet.abort(); } }); }

   如果呼叫成功,則呼叫handlerResponse方法處理呼叫結果,在該方法中就是和前面一樣的對結果進行攔截過濾,並最後使用ctx.flush()將結果返回給客戶端。

    private void handlerResponse(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, org.apache.http.HttpResponse httpResponse) throws Exception {
        FullHttpResponse response = null;
        try {
            byte[] body = EntityUtils.toByteArray(httpResponse.getEntity());
            //String value = okHttpClient.testHttpGet(url);
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(body));

            response.headers().set("Content-Type", "application/json");
            response.headers().set("Content-Length", response.content().readableBytes());
            responseFilter.filter(response);
        } catch (Exception e) {
            System.out.printf("處理異常" + e.getMessage());
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
        }finally {
            if(fullHttpRequest != null){
                if(!HttpUtil.isKeepAlive(fullHttpRequest)){
                    ctx.write(response).addListener(ChannelFutureListener.CLOSE);
                }else {
                    //response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE);
                    ctx.write(response);
                }
            }
            ctx.flush();
        }
    }