基於netty實現的遠端服務框架
阿新 • • 發佈:2018-12-24
HSF服務管理平臺
基於netty實現遠端服務框架,為終端提供REST形式的HTTP服務。
目前只實現部分功能,可以提供REST形式和傳統形式的HTTP服務,其特點主要包括:
- 基於netty實現http協議開發,作為服務端和客戶端的通訊橋樑
- 利用zk管理服務提供者,實現分佈是部署
- 通過路由平臺,隨機分發請求,保證負載均衡
- 動態監控服務提供者的存活狀態
- 服務提供者開發簡單,易於接入
一、架構設計
二、流程
三、服務端介紹
服務提供者引入一個核心jar,通過xml配置,即可釋出服務。
核心jar的部分程式碼介紹
ZookeeperFactory類:
package com.ab.hsf.zk; import com.ab.hsf.constants.Constants; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import java.util.List; /** * zookeeper工廠類 */ public class ZookeeperFactory { public static Logger logger = Logger.getLogger(ZookeeperFactory.class); /** * zookeeper服務地址 */ private String hosts; /** * 回話的超時時間(毫秒) */ private Integer sessionTimeOut; /** * 連線的超時時間(毫秒) */ private Integer connectionTimeOut; /** * 名稱空間 */ private String nameSpace; /** * zookeeper管理物件 */ private CuratorFramework zkTools; /** * 應用ip:port */ private String appAddress; /** * 連線狀態 */ private String connectionState; /** * 連線 */ public void connection() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, Integer.MAX_VALUE); zkTools = CuratorFrameworkFactory .builder() .connectString(hosts) .namespace(nameSpace) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeOut == null ? 30000 : connectionTimeOut) .sessionTimeoutMs(sessionTimeOut == null ? 300000 : sessionTimeOut) .build(); zkTools.start(); connectionState = "CONNECTED"; addListener(); } /** * 註冊 * * @param interFaceIds 介面服務列表 */ public void register(List<String> interFaceIds) { if (interFaceIds == null) { logger.error("interface list is null"); return; } try { for (String interFaceId : interFaceIds) { String interFaceIdNode = Constants.SEPARATOR + interFaceId; //節點路徑 if (connectionState != null && (connectionState.equals("CONNECTED") || connectionState.equals("RECONNECTED"))) { if (zkTools.checkExists().forPath(interFaceIdNode) == null) { //無當前節點 zkTools.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(interFaceIdNode);//建立的路徑和值 } String ipNode = interFaceIdNode + Constants.SEPARATOR + this.getAppAddress(); //節點路徑 if (zkTools.checkExists().forPath(ipNode) != null) { //有當前IP的接點,則刪除後,重新建立 zkTools.delete().forPath(ipNode); } zkTools.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(ipNode);//建立的路徑和值 } } } catch (Exception e) { logger.error("create zookeeper node failure", e); } } /** * 連線狀態監聽 */ public void addListener() { zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState.equals(ConnectionState.CONNECTED)) { logger.info("連線"); connectionState = "CONNECTED"; } if (newState.equals(ConnectionState.RECONNECTED)) { logger.info("重新連線"); connectionState = "RECONNECTED"; connection(); } if (newState.equals(ConnectionState.LOST)) { logger.info("丟失"); connectionState = "LOST"; } if (newState.equals(ConnectionState.SUSPENDED)) { logger.info("暫停"); connectionState = "SUSPENDED"; } if (newState.equals(ConnectionState.READ_ONLY)) { logger.info("只讀"); connectionState = "READ_ONLY"; } } }); } /** * 關閉連線 */ public void close() { if (zkTools != null) { zkTools.close(); zkTools = null; } } public String getHosts() { return hosts; } public void setHosts(String hosts) { this.hosts = hosts; } public Integer getSessionTimeOut() { return sessionTimeOut; } public void setSessionTimeOut(Integer sessionTimeOut) { this.sessionTimeOut = sessionTimeOut; } public Integer getConnectionTimeOut() { return connectionTimeOut; } public void setConnectionTimeOut(Integer connectionTimeOut) { this.connectionTimeOut = connectionTimeOut; } public String getNameSpace() { return nameSpace; } public void setNameSpace(String nameSpace) { this.nameSpace = nameSpace; } public String getAppAddress() { return appAddress; } public void setAppAddress(String appAddress) { this.appAddress = appAddress; } }
netty實現部分程式碼:
package com.ab.hsf.server.http; import com.ab.hsf.server.HsfServer; import com.ab.hsf.server.http.handler.HsfHttpServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; /** * http服務類 * User: alex * DateTime: 15-7-23 下午2:03 */ public class HsfHttpServer implements HsfServer { protected ServerBootstrap bootstrap = new ServerBootstrap(); protected EventLoopGroup bossGroup = new NioEventLoopGroup(); protected EventLoopGroup workerGroup = new NioEventLoopGroup(); protected int port = 8080; private int backlog = 128; private int maxRequestSize = 1024 * 1024 * 10; protected boolean keepalive = false; // 是否長連線 /** * 啟動服務 */ public void start() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // server端接收到的是httpRequest,所以要使用HttpRequestDecoder進行解碼 ch.pipeline().addLast(new HttpRequestDecoder()); // server端傳送的是httpResponse,所以要使用HttpResponseEncoder進行編碼 ch.pipeline().addLast(new HttpResponseEncoder()); //HttpObjectAggregator會把多個訊息轉換為一個單一的FullHttpRequest或是FullHttpResponse // ch.pipeline().addLast(new HttpObjectAggregator(maxRequestSize)); //解決粘包/半包問題 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 2, 0, 2)); //解決粘包/半包問題 ch.pipeline().addLast(new LengthFieldPrepender(2)); //壓縮 // ch.pipeline().addLast(new HttpContentCompressor()); //處理類 ch.pipeline().addLast(new HsfHttpServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, backlog) .childOption(ChannelOption.SO_KEEPALIVE, keepalive); ChannelFuture f = bootstrap.bind(port).syncUninterruptibly(); f.channel().closeFuture().syncUninterruptibly(); } /** * 停止服務 */ public void stop() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public int getBacklog() { return backlog; } public void setBacklog(int backlog) { this.backlog = backlog; } public boolean isKeepalive() { return keepalive; } public void setKeepalive(boolean keepalive) { this.keepalive = keepalive; } }
package com.ab.hsf.server.http.handler; import com.ab.hsf.analysis.ParamsAnalysis; import com.ab.hsf.bean.HsfServiceBean; import com.ab.hsf.constants.Constants; import com.ab.hsf.data.Invocation; import com.ab.hsf.data.RequestMessage; import com.ab.hsf.data.ResponseMessage; import com.ab.hsf.init.HsfServiceFactoryBean; import com.ab.hsf.reflect.Invoker; import com.ab.hsf.reflect.impl.DefaultInvoker; import com.ab.hsf.util.StringUtils; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.*; import java.io.UnsupportedEncodingException; import java.net.URI; import java.util.Map; import static io.netty.handler.codec.http.HttpHeaders.Names.*; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** * http服務處理類 * User: alex * DateTime: 15-7-23 下午2:09 */ public class HsfHttpServerHandler extends ChannelInboundHandlerAdapter { private HttpRequest request; private ParamsAnalysis paramsAnalysis; private Invoker defaultInvoker; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestMessage requestMessage = null; if (msg instanceof HttpRequest) { request = (HttpRequest) msg; URI uri = new URI(request.getUri()); if (uri.getPath().equals("/favicon.ico")) { return; } paramsAnalysis = new ParamsAnalysis(request, request.getUri()); requestMessage = paramsAnalysis.getMethodHandle(request); } if (msg instanceof HttpContent) { HttpContent httpContent = (HttpContent) msg; requestMessage = paramsAnalysis.postMethodHandle(httpContent); } //判斷是否滿足條件 if (requestMessage != null && requestMessage.getErrorMessage() != null) { return; } // 解析http頭部 Map<String,String> httpHeaderMap = paramsAnalysis.parseHeader(request.headers()); //反射取值 String response = invokerHandle(requestMessage,httpHeaderMap); //響應 ctx.write(this.httpResponseHandle(response)); ctx.flush(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } /** * 反射獲取結果 * * @param requestMessage 請求引數 * @return 結果 */ private String invokerHandle(RequestMessage requestMessage,Map<String,String> httpHeaderMap ) { Invocation invocation = requestMessage.getInvocationBody(); HsfServiceBean configBean = HsfServiceFactoryBean.getProvider(invocation.getIfaceId(), invocation.getAlias()); //校驗token if(!checkHeader(configBean,httpHeaderMap)){ return "token is wrong"; } defaultInvoker = new DefaultInvoker(configBean.getInterfacePath(), configBean.getTargetObject()); String result = null; try { ResponseMessage responseMessage = defaultInvoker.invoke(requestMessage); result = String.valueOf(responseMessage.getResponse()); } catch (Exception e) { result = e.getLocalizedMessage(); } return result; } /** * 封裝響應資料資訊 * * @param responseMessage 響應資料 * @return 響應物件 */ private FullHttpResponse httpResponseHandle(String responseMessage) { FullHttpResponse response = null; try { response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(responseMessage.getBytes("UTF-8"))); response.headers().set(CONTENT_TYPE, Constants.RESPONSE_JSON); response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); if (HttpHeaders.isKeepAlive(request)) { response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return response; } /** * 校驗頭資訊 * @param configBean 配置bean * @param httpHeaderMap 頭資訊 * @return true 校驗通過,false校驗失敗 */ private boolean checkHeader(HsfServiceBean configBean,Map<String,String> httpHeaderMap) { boolean flag = false; //需要校驗 if(StringUtils.isNotBlank(configBean.getToken())){ if(httpHeaderMap != null){ //如果token不為空,需要和前臺傳入的token比較,不一致,返回錯誤 String token = httpHeaderMap.get(Constants.TOKEN); if(StringUtils.isNotBlank(token) && configBean.getToken().equals(token)) { //驗證通過 flag = true; } } } else { //驗證通過 flag = true; } return flag; } }
package com.ab.hsf.analysis;
import com.ab.hsf.constants.Constants;
import com.ab.hsf.data.Invocation;
import com.ab.hsf.data.RequestMessage;
import com.ab.hsf.util.ParamsUtils;
import com.ab.hsf.util.ReflectUtils;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* 引數解析類
* User: alex
* DateTime: 15-8-7 下午3:10
*/
public class ParamsAnalysis {
private static final Logger logger = LoggerFactory.getLogger(ParamsAnalysis.class);
private HttpRequest httpRequest;
private String uri;
/**
* 構造方法
* @param httpRequest 請求
* @param uri uri
*/
public ParamsAnalysis(HttpRequest httpRequest, String uri) {
this.httpRequest = httpRequest;
this.uri = uri;
}
/**
* 處理get提交
* @param httpRequest 請求
* @return 結果
*/
public RequestMessage getMethodHandle(HttpRequest httpRequest) {
// 構建請求
RequestMessage requestMessage = new RequestMessage();
HttpMethod reqMethod = httpRequest.getMethod();
if (reqMethod != HttpMethod.GET) {
requestMessage.setErrorMessage("Only allow GET");
return requestMessage;
}
String jsonbody = null;
try {
requestMessage = convertRequestMessage(requestMessage);
Invocation invocation = requestMessage.getInvocationBody();
Object[] paramList = null;
String params = null;
int length = invocation.getArgsType().length;
paramList = new Object[length];
if (uri.indexOf("?") != -1) { //問號傳參形式
params = uri.substring(uri.indexOf("?") + 1);
paramList = ParamsUtils.parseParamArg(invocation.getArgClasses(), params);
} else { //rest傳參形式
paramList = ParamsUtils.parseParamArgForRest(uri);
}
requestMessage.getInvocationBody().setArgs(paramList);
} catch (Throwable e) {
logger.error("Failed to parse http request for uri " + uri + (jsonbody != null ? ", body is " + jsonbody : "") + ".", e);
requestMessage.setErrorMessage("Failed to parse http request for uri " + uri);
}
return requestMessage;
}
/**
* 處理post方法
* @param httpContent 實體
* @return 結果
*/
public RequestMessage postMethodHandle(HttpContent httpContent) {
// 構建請求
RequestMessage requestMessage = new RequestMessage();
HttpMethod reqMethod = httpRequest.getMethod();
if (reqMethod != HttpMethod.POST) {
requestMessage.setErrorMessage("Only allow POST");
return requestMessage;
}
String jsonbody = null;
try {
requestMessage = convertRequestMessage(requestMessage);
Invocation invocation = requestMessage.getInvocationBody();
// 解析請求body
Object[] paramList = null;
ByteBuf buf1 = httpContent.content();
int size = buf1.readableBytes();
byte[] s1 = new byte[size];
buf1.readBytes(s1);
jsonbody = new String(s1, Constants.DEFAULT_CHARSET);
paramList = ParamsUtils.streamParseJson(invocation.getArgClasses(), jsonbody);
if(paramList != null) {
requestMessage.getInvocationBody().setArgs(paramList);
}
} catch (Throwable e) {
logger.error("Failed to parse http request for uri " + uri + (jsonbody != null ? ", body is " + jsonbody : "") + ".", e);
requestMessage.setErrorMessage("Failed to parse http request for uri " + uri);
}
return requestMessage;
}
/**
* 轉換請求頭資訊
* @param requestMessage 請求引數
* @return 結果
*/
private RequestMessage convertRequestMessage(RequestMessage requestMessage) {
// 解析uri
String[] strArr = ParamsUtils.getInterfaceIdAndMethod(uri);
String alias = strArr[0];
String interfaceId = strArr[1];
String methodName = strArr[2];
Invocation invocation = new Invocation();
invocation.setClazzName(interfaceId);
invocation.setIfaceId(interfaceId);
invocation.setMethodName(methodName);
invocation.setAlias(alias);
requestMessage.setInvocationBody(invocation);
Class[] classArray = ReflectUtils.getMethodArgsType(interfaceId, methodName);
if (classArray == null) {
logger.error("params type list can NOT be NULL, please check the interfaceId/methodName. interfaceId:" + interfaceId + " method:" + methodName);
requestMessage.setErrorMessage("params type list can NOT be NULL, please check the interfaceId/methodName. interfaceId:" + interfaceId + " method:" + methodName);
}
requestMessage.getInvocationBody().setArgsType(classArray);
return requestMessage;
}
/**
* 處理頭資訊
*/
public static Map<String,String> parseHeader(HttpHeaders httpHeaders) {
Map<String,String> httpHeaderMap = null;
for (Map.Entry header : httpHeaders) {
if(Constants.ACCEPT.equalsIgnoreCase(header.getKey().toString())) {
String value = String.valueOf(header.getValue());
try {
httpHeaderMap = JSON.parseObject(value, Map.class);
} catch (Exception e) {
logger.error("HttpHeaders Accept is not json data!");
httpHeaderMap = null;
}
}
}
return httpHeaderMap;
}
}
服務端接入方式:
1、下載jar包,或者引入maven依賴
<dependency>
<groupId>hsf</groupId>
<artifactId>hsf</artifactId>
<version>1.0</version>
</dependency>
2、配置XML檔案
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!--實現類 -->
<bean id="providerServiceImpl" class="com.b.asf.provider.impl.ProviderServiceImpl"/>
<!--服務提供者-->
<bean id="providerService" class="com.ab.hsf.bean.HsfServiceBean">
<property name="interfacePath" value="com.b.asf.provider.ProviderService"/>
<property name="targetObject" ref="providerServiceImpl"/>
<property name="alias" value="demo3"/>
<property name="token" value="12345"/>
</bean>
<!--服務配置-->
<bean id="hsfHttpServer" class="com.ab.hsf.server.http.HsfHttpServer">
<property name="port" value="8088"/>
<property name="keepalive" value="true"/>
</bean>
<!--zk管理-->
<bean id="zooKeeperFactory" class="com.ab.hsf.zk.ZookeeperFactory">
<property name="hosts" value="127.0.0.1:2181"/>
<property name="appAddress" value="10.25.3.207:8088"/>
<property name="nameSpace" value="demo3"/>
</bean>
<!--載入服務-->
<bean id="hsfServiceFactoryBean" autowire="no" class="com.ab.hsf.init.HsfServiceFactoryBean">
<property name="serviceList">
<list>
<ref bean="providerService"/>
</list>
</property>
<property name="zookeeperFactory" ref="zooKeeperFactory"/>
<property name="hsfHttpServer" ref="hsfHttpServer"/>
</bean>
</beans>
3、編寫java實現類
宣告介面
public interface ProviderService {
public String getResult(Map params);
}
實現類
public class ProviderServiceImpl implements ProviderService {
public String getResult(Map params){
String r = null;
for(String t : params.keySet()) {
r = params.get(t).toString();
}
return "我是8088:" + r;
}
}
四、路由平臺展示
首頁展示
服務管理展示
服務詳情展示
五、後續
1、增加介面監控告警功能,當服務提供者發生異常時,則通過郵件、簡訊等形式進行告警。
2、增加灰度釋出功能,根據不同版本或者組別,釋出灰度服務
3、增加降級功能,可以根據介面的需求,對介面進行降級操作
4、增加安全攔截,對接入的介面服務做安全校驗