使用Netty手擼一個簡單的閘道器
一、基礎程式碼
首先使用BIO單執行緒、BIO多執行緒、BIO執行緒池的方式啟動埠為8080、8081、8082三個服務,為後續做準備,同時編寫客戶端呼叫,使用HttpClient和OkHttpClient來分別呼叫。
1、BIO模式單執行緒模式
public class HttpServer01 { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); while (true){try{ Socket socket = serverSocket.accept(); service(socket); }catch (Exception e){ e.printStackTrace(); } } } private static void service(Socket socket) { String body = "hello lcl-nio-001"; try{ PrintWriter printWriter= new PrintWriter(socket.getOutputStream(), true); printWriter.println("HTTP/1.1 200 OK"); printWriter.println("Content-Type:text/html;charset=utf-8"); printWriter.println("Content-Length:" + body.getBytes().length); printWriter.println(); printWriter.write(body); printWriter.flush(); printWriter.close(); socket.close(); }catch (Exception e){ } }
2、多執行緒模式
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8081); while (true){ try{ Socket socket = serverSocket.accept(); new Thread(() -> { service(socket); }).start(); }catch (Exception e){ e.printStackTrace(); } } }
3、執行緒池模式
public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8082); ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4); while (true){ try{ Socket socket = serverSocket.accept(); executorService.submit(()->service(socket)); }catch (Exception e){ e.printStackTrace(); } } }
4、HttpClient
Request request = new Request.Builder() .url(url) .get() .build(); System.out.println(url); final Call call = client.newCall(request); Response response = call.execute(); String result = response.body().string();
5、OkHttpClient
HttpGet request = new HttpGet(url); try (CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse response = httpClient.execute(request)) { String result = EntityUtils.toString(response.getEntity()); //System.out.println(url); System.out.println(result); return result; }
二、閘道器 V1.0:實現簡單請求轉發
閘道器的核心功能就是轉發、攔截過濾、路由,同時要保證高效能,那麼閘道器 V1.0 首先關注點在於使用Netty實現訊息的轉發。
1、首先建立一個Handler,根據請求路徑不同,新增不同的返回資料,具體的資料,根據傳入的url地址使用OkHttpClient進行呼叫,使用返回結果拼裝上不同的自定義字串後返回。
由於是 V1.0 版本,所以先直接取了url集合中的第一個,沒有做路由處理,後面版本會做優化。
public class MyHttpHandler extends ChannelInboundHandlerAdapter { private List<String> urlList; public MyHttpHandler(List<String> urlList){ this.urlList = urlList; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; String uri = fullHttpRequest.getUri(); if("/test".equals(uri)){ handlerTest(ctx, fullHttpRequest, "hello lcl"); }else { handlerTest(ctx, fullHttpRequest, "hello other"); } ctx.fireChannelRead(msg); } private void handlerTest(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, String body) { MyOkHttpClient okHttpClient = new MyOkHttpClient(); FullHttpResponse response = null; try { String value = okHttpClient.testHttpGet(this.urlList.get(0)) + body; response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); response.headers().set("Content-Type", "application/json"); response.headers().set("Content-Length", response.content().readableBytes()); } catch (Exception e) { System.out.printf("處理異常" + e.getMessage()); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); }finally { if(fullHttpRequest != null){ if(!HttpUtil.isKeepAlive(fullHttpRequest)){ ctx.write(response).addListener(ChannelFutureListener.CLOSE); }else { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE); ctx.write(response); } } } } }
2、建立ChannelPipeline初始化類,在Pipeline最後新增上上面建立的handler
public class MyHttpInitializer extends ChannelInitializer<SocketChannel> { private List<String> urlList; public MyHttpInitializer(List<String> urlList){ this.urlList = urlList; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(1024*1024)); pipeline.addLast(new MyHttpHandler(urlList)); } }
3、啟動NettyServer
建立ServerBootstrap,設定相關的引數,將上面建立的初始化類設定到childHander中。
public class MyNettyHttpServer { private int port; private List<String> urlList; public MyNettyHttpServer(int port, List<String> urlList){ this.port = port; this.urlList = urlList; } public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workerGroup = new NioEventLoopGroup(16); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_RCVBUF, 21*1024) .childOption(ChannelOption.SO_SNDBUF, 32*1024) .childOption(EpollChannelOption.SO_REUSEPORT, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new MyHttpInitializer(urlList)); Channel channel = serverBootstrap.bind(port).channel(); System.out.println("開啟netty http伺服器,監聽地址和埠為 http://127.0.0.1:" + port + '/'); channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
4、主函式啟動時,初始化HttpServer,並設定監聽埠和需要轉發的Url
@SpringBootApplication public class MyGatewayApplication { public static void main(String[] args) { int port = 8888; String url = "http://localhost:8080,http://localhost:8081"; SpringApplication.run(MyGatewayApplication.class, args); MyNettyHttpServer myNettyHttpServer = new MyNettyHttpServer(port, Arrays.asList(url.split(","))); myNettyHttpServer.start(); } }
5、驗證
conglongli@localhost ~ % curl http://localhost:8888/demo hello lcl-nio-001hello other conglongli@localhost ~ % curl http://localhost:8888/test hello lcl-nio-001hello lcl
至此,一個最簡單的閘道器已經完成。
三、閘道器 V2.0:實現簡單攔截過濾
在閘道器 V1.0的基礎上,V2.0要加入攔截過濾的功能,主要是兩個方面,一方面是對請求的攔截過濾,另一方面是對返回的攔截過濾。
1、首先建立對應的請求響應過濾介面,然後各實現一個新增Header的過濾器。
public interface HttpRequestFilter { void filter(FullHttpRequest request, ChannelHandlerContext ctx); }
public interface HttpResponseFilter { void filter(FullHttpResponse response); }
public class HeaderHttpHttpRequestFilter implements HttpRequestFilter { @Override public void filter(FullHttpRequest request, ChannelHandlerContext ctx) { request.headers().set("testkey", "lcl"); System.out.printf("=========" + request.headers().get("testkey")); } }
public class HeaderHttpHttpResponseFilter implements HttpResponseFilter { @Override public void filter(FullHttpResponse response) { response.headers().set("testkey","ml"); } }
2、建立一個HttpOutboundHandler,在其中主要就是攔截請求,呼叫上面的filter對header做設定,設定完成後才發起http呼叫,呼叫完成後,對於response做過濾,設定頭資訊。
public class HttpOutboundHandler { private List<String> urlList; HttpResponseFilter responseFilter = new HeaderHttpHttpResponseFilter(); public HttpOutboundHandler(List<String> urlList){ this.urlList = urlList; } public void handler(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, HttpRequestFilter filter){ filter.filter(fullHttpRequest, ctx); MyOkHttpClient okHttpClient = new MyOkHttpClient(); FullHttpResponse response = null; try { String value = okHttpClient.testHttpGet(this.urlList.get(0)); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); response.headers().set("Content-Type", "application/json"); response.headers().set("Content-Length", response.content().readableBytes()); responseFilter.filter(response); } catch (Exception e) { System.out.printf("處理異常" + e.getMessage()); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); }finally { if(fullHttpRequest != null){ if(!HttpUtil.isKeepAlive(fullHttpRequest)){ ctx.write(response).addListener(ChannelFutureListener.CLOSE); }else { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE); ctx.write(response); } } } } }
3、修改 V1.0的MyhttpHander,直接呼叫上面的handler方法
public class MyHttpHandler extends ChannelInboundHandlerAdapter { private List<String> urlList; private HttpOutboundHandler httpOutboundHandler; HttpRequestFilter httpRequestFilter = new HeaderHttpHttpRequestFilter(); public MyHttpHandler(List<String> urlList){ this.urlList = urlList; this.httpOutboundHandler = new HttpOutboundHandler(this.urlList); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; String uri = fullHttpRequest.getUri(); // if("/test".equals(uri)){ // handlerTest(ctx, fullHttpRequest, "hello lcl"); // }else { // handlerTest(ctx, fullHttpRequest, "hello other"); // } httpOutboundHandler.handler(fullHttpRequest, ctx, httpRequestFilter); ctx.fireChannelRead(msg); }
四、閘道器 V3.0:實現簡單路由
在 V2.0 中添加了攔截過濾,在 V3.0 中新增路由策略
1、編寫一個Router介面和實現類,實現路由功能
public interface HttpEndpointRouter { String getUrl(List<String> urlList); }
實現類可以有多種,例如輪詢、隨機、最小連線數等等,這裡實現一個最簡單的隨機
public class RandomHttpRouter implements HttpEndpointRouter{ @Override public String getUrl(List<String> urlList) { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(urlList.size()); return urlList.get(index); } }
2、然後修改 V2.0 中獲取url的方法即可
FullHttpResponse response = null; try { String uri = fullHttpRequest.getUri(); String url = router.getUrl(this.urlList) + uri; String value = okHttpClient.testHttpGet(url); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); response.headers().set("Content-Type", "application/json"); response.headers().set("Content-Length", response.content().readableBytes()); responseFilter.filter(response);
在路由這一塊,還有很多可以優化的,例如是否要判斷ip是否可用等。
五、高效能優化:使用執行緒池和非同步呼叫
上面基本上已經涵蓋了轉發、攔截過濾、路由這些最基本的簡單實現,然後就是如何保證閘道器的高效能。
上面使用的是同步呼叫的方式,那麼為了提高效能,可以使用非同步和執行緒池的方式進行呼叫,同時使用Future獲取返回資訊。
首先構建執行緒池,在OkHttpOutboundHandler的建構函式中初始化執行緒池
public OkHttpOutboundHandler(List<String> urlList){ this.urlList = urlList; // 定義執行緒池及引數 int corePoolSize = Runtime.getRuntime().availableProcessors(); long keepAliveTime = 1000; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2048); ThreadFactory threadFactory = new NamedThreadFactory("proxy-service"); RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); proxyServicePool = new ThreadPoolExecutor(corePoolSize,corePoolSize,keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory, handler); }
在其中定義了一個執行緒工廠,主要就是用來生成執行緒,並設定執行緒名稱和是否是守護執行緒。
public class NamedThreadFactory implements ThreadFactory { private final String prefixName; private final ThreadGroup threadGroup; private final boolean demon; private final AtomicInteger threadNumber = new AtomicInteger(1); public NamedThreadFactory(String prefixName) { this(prefixName,false); } public NamedThreadFactory(String prefixName, boolean demon) { this.prefixName = prefixName; this.demon = demon; SecurityManager securityManager = System.getSecurityManager(); this.threadGroup = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(@NotNull Runnable r) { Thread t = new Thread(threadGroup, r,prefixName+"-thread-"+threadNumber.incrementAndGet(), 0); t.setDaemon(demon); return t; } }
在handler方法中,對Request進行攔截過濾,根據路由策略選擇一個服務,最後將遠端呼叫人體提交到執行緒池
public void handler(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, HttpRequestFilter filter){ filter.filter(fullHttpRequest, ctx); String uri = fullHttpRequest.getUri(); String url = router.getUrl(this.urlList) + uri; proxyServicePool.submit(() -> fetchhttpGet(fullHttpRequest, ctx, url)); }
在fetchhttpget方法中與之前沒有區別,直接使用遠端呼叫,在呼叫的最後,對Response進行攔截過濾,最後使用ctx.flush()方法將返回結果通知給客戶端。
private void fetchhttpGet(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, String url) { MyOkHttpClient okHttpClient = new MyOkHttpClient(); FullHttpResponse response = null; try { String value = okHttpClient.testHttpGet(url); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8"))); response.headers().set("Content-Type", "application/json"); response.headers().set("Content-Length", response.content().readableBytes()); responseFilter.filter(response); } catch (Exception e) { System.out.printf("處理異常" + e.getMessage()); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); }finally { if(fullHttpRequest != null){ if(!HttpUtil.isKeepAlive(fullHttpRequest)){ ctx.write(response).addListener(ChannelFutureListener.CLOSE); }else { //response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE); ctx.write(response); } } ctx.flush(); } }
這樣就完成了通過執行緒池提高併發和效能的處理,但是對於http呼叫來說,仍然是同步的,接下來採用非同步請求客戶端來提高併發效能。
首先在初始化OkHttpOutboundHandler的建構函式中初始化客戶端
private CloseableHttpAsyncClient httpClient; HttpResponseFilter responseFilter = new HeaderHttpHttpResponseFilter(); HttpEndpointRouter router = new RandomHttpRouter(); public OkHttpOutboundHandler(List<String> urlList){ this.urlList = urlList; // 定義執行緒池及引數 int corePoolSize = Runtime.getRuntime().availableProcessors(); long keepAliveTime = 1000; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2048); ThreadFactory threadFactory = new NamedThreadFactory("proxy-service"); RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); proxyServicePool = new ThreadPoolExecutor(corePoolSize,corePoolSize,keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory, handler); IOReactorConfig ioReactorConfig = IOReactorConfig.custom() .setConnectTimeout(1000) .setSoTimeout(1000) .setIoThreadCount(corePoolSize) .setRcvBufSize(32*1024).build(); httpClient = HttpAsyncClients.custom().setMaxConnTotal(40) .setMaxConnPerRoute(8) .setDefaultIOReactorConfig(ioReactorConfig) .setKeepAliveStrategy(((httpResponse, httpContext) -> 6000)) .build(); httpClient.start(); }
然後在fetchhttpGet方法中使用客戶端的execute方法將請求非同步提交,並且使用Future非同步獲取結果,在結果監聽中,如果呼叫成功則處理呼叫結果
private void fetchGet(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, String url) { final HttpGet httpGet = new HttpGet(url); httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE); httpClient.execute(httpGet, new FutureCallback<org.apache.http.HttpResponse>() { @Override public void completed(org.apache.http.HttpResponse httpResponse) { try { handlerResponse(fullHttpRequest, ctx, httpResponse); } catch (Exception e) { e.printStackTrace(); } }
@Override public void failed(Exception e) { httpGet.abort(); e.printStackTrace(); } @Override public void cancelled() { httpGet.abort(); } }); }
如果呼叫成功,則呼叫handlerResponse方法處理呼叫結果,在該方法中就是和前面一樣的對結果進行攔截過濾,並最後使用ctx.flush()將結果返回給客戶端。
private void handlerResponse(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx, org.apache.http.HttpResponse httpResponse) throws Exception { FullHttpResponse response = null; try { byte[] body = EntityUtils.toByteArray(httpResponse.getEntity()); //String value = okHttpClient.testHttpGet(url); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(body)); response.headers().set("Content-Type", "application/json"); response.headers().set("Content-Length", response.content().readableBytes()); responseFilter.filter(response); } catch (Exception e) { System.out.printf("處理異常" + e.getMessage()); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); }finally { if(fullHttpRequest != null){ if(!HttpUtil.isKeepAlive(fullHttpRequest)){ ctx.write(response).addListener(ChannelFutureListener.CLOSE); }else { //response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.KEEP_ALIVE); ctx.write(response); } } ctx.flush(); } }