1. 程式人生 > 其它 >容器編排系統K8s之節點汙點和pod容忍度

容器編排系統K8s之節點汙點和pod容忍度

6280人閱讀 Netty是業界最流行的nio框架之一,結合springboot可以滿足快速開發   MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸協議),是一種基於釋出/訂閱(publish/subscribe)模式的"輕量級"通訊協議,該協議構建於TCP/IP協議上的。MQTT協議的可以用在物聯網、小型裝置、還有移動應用上。   Netty也可以實現MQTT協議,他的內部封裝了MQTT協議的相關物件。   使用Netty+SpringBoot方式可以快速地開發一套基於MQTT協議(主要是MQTT3.1和MQTT3.1.1)的服務端程式   SpringBoot+Netty建立,pom.xml檔案匯入依賴包      <properties>   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>   <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>   <java.version>1.8</java.version>  </properties>    <parent>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-parent</artifactId>   <version>2.1.6.RELEASE</version>   <relativePath /> <!-- lookup parent from repository -->  </parent>    <dependencies>     <!--web模組的啟動器 -->   <dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId>   </dependency>   <!-- netty依賴 springboot2.x自動匯入版本 -->   <dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>   </dependency>     </dependencies>  Springboot啟動類,直接在main裡面啟動netty的MQTT服務(也包含web應用的)   package boot.example.mqtt.server;   import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;   import boot.example.mqtt.server.server.BootNettyServer;       @SpringBootApplication public class BootNettyApplication  {     public static void main( String[] args )     {   SpringApplication app = new SpringApplication(BootNettyApplication.class);   app.run(args);   // 啟動 1883         new BootNettyServer().startup();     } } Netty的MQTT啟動類   package boot.example.mqtt.server.server;     import boot.example.mqtt.server.adapter.BootChannelInboundHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.timeout.IdleStateHandler;     public class BootNettyServer {    private int port = 1883;    private NioEventLoopGroup bossGroup;    private NioEventLoopGroup workGroup;    /**   * 啟動服務   * @throws InterruptedException    */  public void startup() {     try {    bossGroup = new NioEventLoopGroup(1);    workGroup = new NioEventLoopGroup();      ServerBootstrap bootstrap = new ServerBootstrap();    bootstrap.group(bossGroup, workGroup);    bootstrap.channel(NioServerSocketChannel.class);      bootstrap.option(ChannelOption.SO_REUSEADDR, true)      .option(ChannelOption.SO_BACKLOG, 1024)      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)      .option(ChannelOption.SO_RCVBUF, 10485760);      bootstrap.childOption(ChannelOption.TCP_NODELAY, true)      .childOption(ChannelOption.SO_KEEPALIVE, true)      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);      bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {       protected void initChannel(SocketChannel ch) {        ChannelPipeline channelPipeline = ch.pipeline();        // 設定讀寫空閒超時時間        channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));        channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);        channelPipeline.addLast("decoder", new MqttDecoder());        channelPipeline.addLast(new BootChannelInboundHandler());       }      });    ChannelFuture f = bootstrap.bind(port).sync();    f.channel().closeFuture().sync();       } catch (Exception e) {    System.out.println("start exception"+e.toString());   }    }    /**   * 關閉服務   */  public void shutdown() throws InterruptedException {   if (workGroup != null && bossGroup != null) {    bossGroup.shutdownGracefully();    work     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {         sup