容器編排系統K8s之節點汙點和pod容忍度
阿新 • • 發佈:2022-05-07
6280人閱讀
Netty是業界最流行的nio框架之一,結合springboot可以滿足快速開發
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱(publish/subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上的。MQTT協議的可以用在物聯網、小型裝置、還有移動應用上。
Netty也可以實現MQTT協議,他的內部封裝了MQTT協議的相關物件。
使用Netty+SpringBoot方式可以快速地開發一套基於MQTT協議(主要是MQTT3.1和MQTT3.1.1)的服務端程式
SpringBoot+Netty建立,pom.xml檔案匯入依賴包
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!--web模組的啟動器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- netty依賴 springboot2.x自動匯入版本 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
Springboot啟動類,直接在main裡面啟動netty的MQTT服務(也包含web應用的)
package boot.example.mqtt.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import boot.example.mqtt.server.server.BootNettyServer;
@SpringBootApplication
public class BootNettyApplication
{
public static void main( String[] args )
{
SpringApplication app = new SpringApplication(BootNettyApplication.class);
app.run(args);
// 啟動 1883
new BootNettyServer().startup();
}
}
Netty的MQTT啟動類
package boot.example.mqtt.server.server;
import boot.example.mqtt.server.adapter.BootChannelInboundHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
public class BootNettyServer {
private int port = 1883;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
/**
* 啟動服務
* @throws InterruptedException
*/
public void startup() {
try {
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
// 設定讀寫空閒超時時間
channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast(new BootChannelInboundHandler());
}
});
ChannelFuture f = bootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("start exception"+e.toString());
}
}
/**
* 關閉服務
*/
public void shutdown() throws InterruptedException {
if (workGroup != null && bossGroup != null) {
bossGroup.shutdownGracefully();
work
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
sup