Springboot2(26)整合netty實現websocket通訊
阿新 • • 發佈:2018-12-29
springboot2教程系列
其它netty檔案有部落格Springboot2(24)整合netty實現http服務(類似SpingMvc的contoller層實現)
實現websocket通訊,和廣播訊息
新增依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.1.Final</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons.lang.version}</version> </dependency>
排除tomcat的依賴
Netty Http服務端編寫
handler 處理類
@Component
@Slf4j
@ChannelHandler.Sharable //@Sharable 註解用來說明ChannelHandler是否可以在多個channel直接共享使用
@ConditionalOnProperty( //配置檔案屬性是否為true
value = {"netty.ws.enabled"},
matchIfMissing = false
)
public class WsServerHandler extends ChannelInboundHandlerAdapter {
@Autowired
NettyWsProperties nettyWsProperties;
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private WebSocketServerHandshaker handshaker;
//websocket握手升級繫結頁面
String wsFactoryUri = "";
@Value("${netty.ws.endPoint:/ws}")
private String wsUri;
//static Set<Channel> channelSet = new HashSet<>();
/*
* 握手建立
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.add(incoming);
}
/*
* 握手取消
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
channels.remove(incoming);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
//websocket訊息處理(只支援文字)
public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 關閉請求
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// ping請求
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 只支援文字格式,不支援二進位制訊息
if (frame instanceof TextWebSocketFrame) {
//接收到的訊息
String requestmsg = ((TextWebSocketFrame) frame).text();
TextWebSocketFrame tws = new TextWebSocketFrame(requestmsg);
channels.writeAndFlush(tws);
}
}
// 第一次請求是http請求,請求頭包括ws的資訊
public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
// 如果HTTP解碼失敗,返回HTTP異常
if (request instanceof HttpRequest) {
HttpMethod method = request.getMethod();
// 如果是websocket請求就握手升級
if (wsUri.equalsIgnoreCase(request.getUri())) {
System.out.println(" req instanceof HttpRequest");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
wsFactoryUri, null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
}
handshaker.handshake(ctx.channel(), request);
}
}
}
// 異常處理,netty預設是關閉channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 讀資料超時
} else if (event.state() == IdleState.WRITER_IDLE) {
// 寫資料超時
} else if (event.state() == IdleState.ALL_IDLE) {
// 通道長時間沒有讀寫,服務端主動斷開連結
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
ChannelPipeline 實現
@Component
@ConditionalOnProperty( //配置檔案屬性是否為true
value = {"netty.ws.enabled"},
matchIfMissing = false
)
public class WsPipeline extends ChannelInitializer<SocketChannel>{
@Autowired
WsServerHandler wsServerHandler;
private static final int READ_IDEL_TIME_OUT = 3; // 讀超時
private static final int WRITE_IDEL_TIME_OUT = 4;// 寫超時
private static final int ALL_IDEL_TIME_OUT = 5; // 所有超時
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.MINUTES));
p.addLast("http-codec", new HttpServerCodec());
p.addLast("aggregator", new HttpObjectAggregator(65536));
p.addLast("http-chunked", new ChunkedWriteHandler());
p.addLast("handler",wsServerHandler);
}
}
服務實現
@Configuration
@EnableConfigurationProperties({NettyWsProperties.class})
@ConditionalOnProperty( //配置檔案屬性是否為true
value = {"netty.ws.enabled"},
matchIfMissing = false
)
@Slf4j
public class WsServer {
@Autowired
WsPipeline wsPipeline;
@Autowired
NettyWsProperties nettyWsProperties;
@Bean("starWsServer")
public String start() {
// 準備配置
// HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
// 啟動伺服器
Thread thread = new Thread(() -> {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(nettyWsProperties.getBossThreads());
NioEventLoopGroup workerGroup = new NioEventLoopGroup(nettyWsProperties.getWorkThreads());
try {
log.info("start netty [WebSocket] server ,port: " + nettyWsProperties.getPort());
ServerBootstrap boot = new ServerBootstrap();
options(boot).group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(wsPipeline);
Channel ch = null;
//是否繫結IP
if(StringUtils.isNotEmpty(nettyWsProperties.getBindIp())){
ch = boot.bind(nettyWsProperties.getBindIp(),nettyWsProperties.getPort()).sync().channel();
}else{
ch = boot.bind(nettyWsProperties.getPort()).sync().channel();
}
ch.closeFuture().sync();
} catch (InterruptedException e) {
log.error("啟動NettyServer錯誤", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
});
thread.setName("Ws_Server");
thread.start();
return "ws start";
}
private ServerBootstrap options(ServerBootstrap boot) {
boot.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return boot;
}
}
啟動配置
---application.yml
spring.profiles.active: ws
---application-ws.yml
netty:
ws:
enabled: true
port: 9988
endPoint: /ws
測試
在瀏覽器開啟多個http://127.0.0.1:8080/socket.html