Springboot2(24)整合netty實現http服務(類似SpingMvc的contoller層實現)
阿新 • • 發佈:2018-12-29
springboot2教程系列
其它netty檔案有部落格Springboot2(24)整合netty實現http服務(類似SpingMvc的contoller層實現)
SpringBoot中使用Netty與spring中使用Netty沒有差別,在Spring中使用Netty可以考慮Netty的啟動時機,可以在Bean載入的時候啟動,可以寫一個自執行的函式啟動,這裡採用監聽Spring容器的啟動事件來啟動Netty。
實現類似SpingMvc的contoller層實現
新增依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.1.Final</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons.lang.version}</version> </dependency>
排除tomcat的依賴
Netty Http服務端編寫
handler 處理類
@Component
@Slf4j
@ChannelHandler.Sharable //@Sharable 註解用來說明ChannelHandler是否可以在多個channel直接共享使用
public class HttpServerHandler extends ChannelInboundHandlerAdapter {
private static Map<String, Action> actionMapping = null;
public Map< String, Action> getActionMapping(){
if(actionMapping == null){
return actionMapping = ClassLoaderUtil.buildActionMapping();
}
return actionMapping;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
//封裝請求和響應
HttpRequestWrapper httpRequestWrapper = buildRequestWraper(req);
//建造netty響應
HttpResponseWrapper httpResponseWrapper = new HttpResponseWrapper();
Action action = getActionMapping().get(httpRequestWrapper.getUri());
if(action != null){
Object responseBody = null;
Object object = action.getMethod().invoke(action.getBean(),buildArgs(
action, httpRequestWrapper.getParams(),httpRequestWrapper));
if(StringUtils.isNotEmpty(action.getResponseType()) &&
action.getResponseType().equals("JSON")){
responseBody = JSON.toJSONString(object);
}else{
responseBody = object;
}
httpResponseWrapper.write(object.toString().getBytes("UTF-8"));
}
FullHttpResponse response = buildResponse(httpResponseWrapper);
boolean keepAlive = HttpUtil.isKeepAlive(req);
if (!keepAlive) {
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
//負責顯式釋放與池的ByteBuf例項相關聯的記憶體,SimpleChannelInboundHandler會自動釋放資源,因此無需顯式釋放
ReferenceCountUtil.release(msg);
} catch (Exception e) {
log.error("system exception:{}", e);
}
}
/**
* 構建請求物件
* @param req
* @return
*/
private HttpRequestWrapper buildRequestWraper(FullHttpRequest req) {
HashMap<String, String> headersMap = new HashMap<String, String>(16);
for (Map.Entry<String, String> entry : req.headers()) {
headersMap.put(entry.getKey(), entry.getValue());
}
byte[] content = new byte[req.content().readableBytes()];
req.content().readBytes(content);
String url = req.uri();
String params = "";
if(url.indexOf("?")>0){
String[] urls = url.split("\\?");
url = urls[0];
params = urls[1];
}
return new HttpRequestWrapper(req.method().name(), url, headersMap, content, params);
}
/**
* 構建響應物件
* @param httpResponseWrapper
* @return
*/
private FullHttpResponse buildResponse(HttpResponseWrapper httpResponseWrapper) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.valueOf(httpResponseWrapper.getStatusCode()), Unpooled.wrappedBuffer(httpResponseWrapper.content()));
HttpHeaders headers = response.headers();
headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
headers.set(HttpHeaderNames.CONTENT_TYPE, new AsciiString("application/json; charset=utf-8"));
for (Map.Entry<String, String> entry : httpResponseWrapper.headers().entrySet()) {
headers.set(entry.getKey(), entry.getValue());
}
return response;
}
/**
* 構建請求引數
* @param action
* @param urlParams
* @param httpRequestWrapper
* @return
*/
public Object[] buildArgs(Action action,String urlParams,HttpRequestWrapper httpRequestWrapper){
if(action == null){
return null;
}
LocalVariableTableParameterNameDiscoverer u = new LocalVariableTableParameterNameDiscoverer();
//獲取處理方法的引數
String[] params = u.getParameterNames(action.getMethod());
Object[] objects = new Object[params.length];
Map<String,String> paramMap = new HashMap<>();
try{
if(StringUtils.isNotEmpty(urlParams)){
paramMap = UrlUtils.URLRequest(urlParams);
}
if( httpRequestWrapper.content()!=null){
Parameter[] parameters = action.getMethod().getParameters();
for(Parameter parameter : parameters){
Annotation annotation = parameter.getAnnotation(ActionBody.class);
if(annotation == null){
continue;
}
int index = Integer.parseInt(parameter.getName().substring(3));
paramMap.put(params[index],new String(httpRequestWrapper.content(),"UTF-8"));
}
}
for( int i = 0 ;i<params.length; i++){
final int flag = i;
paramMap.forEach((k,v)->{
if(k.equals(params[flag])){
objects[flag] = v;
}
});
}
}catch(Exception e){
log.error(e.getMessage());
}
return objects;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 當客戶端斷開連線
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}
}
controller層實現
@Component
@ActionMapping(actionKey="controller")
@ResponseType
public class HttpNettyController implements BaseActionController{
@ActionMapping(actionKey = "method")
public String method(String a, String b){
return String.format("a:%s,b:%s",a,b);
}
}
ChannelPipeline 實現
@Component
@ConditionalOnProperty( //配置檔案屬性是否為true
value = {"netty.http.enabled"},
matchIfMissing = false
)
public class HttpPipeline extends ChannelInitializer<SocketChannel> {
@Autowired
HttpServerHandler httpServerHandler;
@Override
public void initChannel(SocketChannel ch) {
// log.error("test", this);
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpContentCompressor());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new ChunkedWriteHandler());
// http請求根處理器
p.addLast(httpServerHandler);
}
}
服務實現
package cn.myframe.netty.server;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.myframe.netty.pipeline.HttpPipeline;
import cn.myframe.properties.NettyHttpProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
*
* HTTP服務
*
* @author ynz
* @email [email protected]
* @version 建立時間:2018年6月25日 下午5:39:36
*/
@Configuration
@EnableConfigurationProperties({NettyHttpProperties.class})
@ConditionalOnProperty( //配置檔案屬性是否為true
value = {"netty.http.enabled"},
matchIfMissing = false
)
@Slf4j
public class HttpServer {
@Autowired
HttpPipeline httpPipeline;
@Autowired
NettyHttpProperties nettyHttpProperties;
@Bean("starHttpServer")
public String start() {
// 準備配置
// HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
// 啟動伺服器
Thread thread = new Thread(() -> {
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(nettyHttpProperties.getBossThreads());
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(nettyHttpProperties.getWorkThreads());
try {
log.info("start netty [HTTP] server ,port: " + nettyHttpProperties.getPort());
ServerBootstrap boot = new ServerBootstrap();
options(boot).group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(httpPipeline);
Channel ch = null;
//是否繫結IP
if(StringUtils.isNotEmpty(nettyHttpProperties.getBindIp())){
ch = boot.bind(nettyHttpProperties.getBindIp(),
nettyHttpProperties.getPort()).sync().channel();
}else{
ch = boot.bind(nettyHttpProperties.getPort()).sync().channel();
}
ch.closeFuture().sync();
} catch (InterruptedException e) {
log.error("啟動NettyServer錯誤", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
thread.setName("http_Server");
// thread.setDaemon(true);
thread.start();
return "http start";
}
private ServerBootstrap options(ServerBootstrap boot) {
/*if (HttpConfiguration.me().getSoBacklog() > 0) {
boot.option(ChannelOption.SO_BACKLOG, HttpConfiguration.me().getSoBacklog());
}*/
return boot;
}
}
啟動配置
---application.yml
spring.profiles.active: http
---application-http.yml
netty:
http:
enabled: true
port: 1999
bossThreads: 2
workThreads: 4