1. 程式人生 > >Storm系列(六)storm和kafka整合

Storm系列(六)storm和kafka整合

使用kafka-client jar進行Storm Apache Kafka整合

這包括新的Apache Kafka消費者API。相容性 Apache Kafka版本0.10起 引入jar包

    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka-client</artifactId>
      <version>1.2.0</version>
    </dependency>
複製程式碼

原文連結:a870439570.github.io/interview-d…

從kafka中訂閱訊息讀取

通過使用KafkaSpoutConfig類來配置spout實現。此類使用Builder模式,可以通過呼叫其中一個Builders建構函式或通過呼叫KafkaSpoutConfig類中的靜態方法構建器來啟動。

用法示例

建立一個簡單的不kafka資料來源 以下將使用釋出到“topic”的所有事件,並將它們傳送到MyBolt,其中包含“topic”,“partition”,“offset”,“key”,“value”欄位。

  TopologyBuilder tp = new TopologyBuilder();
            tp.setSpout("kafka_spout"
, new KafkaSpout(KafkaSpoutConfig.builder("localhost:9092" , "qxw").build()), 1); tp.setBolt("bolt", new MyBolt()).shuffleGrouping("kafka_spout"); Config cfg=new Config(); cfg.setNumWorkers(1);//指定工作程序數 (jvm數量,分散式環境下可用,本地模式設定無意義) cfg.setDebug(true); LocalCluster locl=new LocalCluster(); locl.submitTopology("kkafka-topo"
,cfg,tp.createTopology()); 複製程式碼
 public static  class MyBolt extends BaseBasicBolt{
            public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
                System.err.println("接受訂閱kafka訊息:  "+tuple.getStringByField("topic"));
                System.err.println("接受訂閱kafka訊息:  "+tuple.getStringByField("value"));
            }
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            }
        }
複製程式碼