1. 程式人生 > >storm整合kafka新版API(0.8版本之後)

storm整合kafka新版API(0.8版本之後)

本例storm版本為1.1.0  kafka版本為2.11.0.10.0.1

  • 匯入maven依賴

<!--引入storm -->
    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.0</version>
      <!--<scope>provided</scope>-->
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>log4j-over-slf4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

<!--引入kafka-clients-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.10.0.1</version>
    </dependency>

<!--引入kafka-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.10.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>log4j-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-slf4j-impl</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

   <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka-client</artifactId>
      <version>1.1.0</version>
    </dependency>
  • 編寫主函式啟動類的Topo

package com.simon.storm.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

/**
 * Created by Simon on 2018/11/1.
 */
public class NewKafKaTopo {

    public static void main(String[] args) {
        //建立TopologyBuilder
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfigBuilder;
        //kafka連線資訊
        String bootstrapServers="192.168.1.86:9092,192.168.1.87:9093,192.168.1.88:9094";
        //主題
        String topic = "test";
        /**
         * 構造kafkaSpoutConfigBuilder構造器
         *
         * bootstrapServers:    Kafka連結地址 ip:port
         * StringDeserializer:  key Deserializer    主題key的反序列化
         * StringDeserializer:  value Deserializer  主題的value的反序列化
         * topic: 主題名稱
         */
        kafkaSpoutConfigBuilder = new KafkaSpoutConfig.Builder<>(
                bootstrapServers,
                StringDeserializer.class,
                StringDeserializer.class,
                topic);

        //使用kafkaSpoutConfigBuilder構造器構造kafkaSpoutConfig,並配置相應屬性
        KafkaSpoutConfig<String, String> kafkaSpoutConfig = kafkaSpoutConfigBuilder
                /**
                 * 設定groupId
                 */
                .setProp(ConsumerConfig.GROUP_ID_CONFIG, topic.toLowerCase() + "_storm_group")

                /**
                 * 設定session超時時間,該值應介於
                 * [group.min.session.timeout.ms, group.max.session.timeout.ms] [6000,300000]
                 * 預設值:10000
                 */
                .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100000")

                /**
                 * 設定拉取最大容量
                 */
                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576")

                /**
                 * 設定控制客戶端等待請求響應的最大時間量
                 * 預設值:30000
                 */
                .setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000")

                /**
                 * 設定心跳到消費者協調器之間的預期時間。
                 * 心跳用於確保消費者的會話保持活動並且當新消費者加入或離開組時促進重新平衡
                 * 預設值:	3000        (一般設定低於session.timeout.ms的三分之一)
                 */
                .setProp(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30000")

                /**
                 * 設定offset提交時間15s  預設30s
                 */
                .setOffsetCommitPeriodMs(15000)

                /**
                 * 設定拉取最大在session超時時間內最好處理完成的個數
                 */
                .setMaxPollRecords(20)

                /**
                 * 設定拉取策略
                 */
                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)

                /**
                 * 構造kafkaSpoutConfig
                 */
                .build();

        //setSpout
        topologyBuilder.setSpout("kafkaSpout",new KafkaSpout(kafkaSpoutConfig));

        //setbolt
        topologyBuilder.setBolt("KafkaSpoutBolt", new KafkaSpoutBolt()).localOrShuffleGrouping("kafkaSpout");

        Config config = new Config();
        /**
         * 設定supervisor和worker之間的通訊超時時間.
         * 超過這個時間supervisor會重啟worker  (秒)
         */
        config.put("supervisor.worker.timeout.secs",600000);
        /**
         * 設定storm和zookeeper之間的超時時間.
         */
        config.put("storm.zookeeper.session.timeout",1200000000);
        /**
         * 設定debug模式 日誌輸出更全
         * 只能在本地LocalCluster模式下啟用
         */
        config.setDebug(true);
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("NewKafKaTopo", config, topologyBuilder.createTopology());
        Utils.sleep(Long.MAX_VALUE);
        localCluster.shutdown();

    }
}
  • 編寫邏輯處理類Bolt

package com.simon.storm.kafka;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

/**
 * Created by Simon on 2018/10/23.
 */
public class KafkaSpoutBolt extends BaseBasicBolt {
    @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        //只做一個輸出
        String string = tuple.getString(0);
        System.out.println(string);
    }

    @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

執行主函式即可。