storm和kafka整合
阿新 • • 發佈:2019-01-13
storm和kafka整合
依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency>
App
package test; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.storm.kafka.spout.Func; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class App { public static void main( String[] args ) throws Exception{ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig .builder("worker1:9092,worker2:9092,worker3:9092", "test") // 你的kafka叢集地址和topic .setProp(ConsumerConfig.GROUP_ID_CONFIG, "consumer") // 設定消費者組,隨便寫 .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 1024 * 4) // .setRecordTranslator(new MyRecordTranslator()) .setRecordTranslator( // 翻譯函式,就是將訊息過濾下,具體操作自己玩 new MyRecordTranslator(), new Fields("word") ) .setRetry( // 某條訊息處理失敗的策略 new KafkaSpoutRetryExponentialBackoff( new TimeInterval(500L, TimeUnit.MICROSECONDS), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10) ) ) .setOffsetCommitPeriodMs(10000) .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) .setMaxUncommittedOffsets(250) .build(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("KafkaSpout", new KafkaSpout<String, String>(conf), 1); builder.setBolt("Recieve", new RecieveBolt(), 1).globalGrouping("KafkaSpout"); builder.setBolt("Consume", new ConsumeBolt(), 1).globalGrouping("Recieve"); builder.createTopology(); // 叢集執行 // Config config = new Config(); // config.setNumWorkers(3); // config.setDebug(true); // StormSubmitter.submitTopology("teststorm", config, builder.createTopology()); // 本地測試 // Config config = new Config(); // config.setNumWorkers(3); // config.setDebug(true); // config.setMaxTaskParallelism(20); // LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("teststorm", config, builder.createTopology()); // Utils.sleep(60000); // // 執行完畢,關閉cluster // cluster.shutdown(); } } class MyRecordTranslator implements Func<ConsumerRecord<String, String>, List<Object>> { private static final long serialVersionUID = 1L; @Override public List<Object> apply(ConsumerRecord<String, String> record) { return new Values(record.value()); } }
ConsumeBolt
package test; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class ConsumeBolt extends BaseRichBolt { private static final long serialVersionUID = -7114915627898482737L; private FileWriter fileWriter = null; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID()); // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } public void execute(Tuple tuple) { try { String word = tuple.getStringByField("word") + "......." + "\n"; fileWriter.write(word); fileWriter.flush(); System.out.println(word); } catch (IOException e) { throw new RuntimeException(e); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
RecieveBolt
package test;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class RecieveBolt extends BaseRichBolt {
private static final long serialVersionUID = -4758047349803579486L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
// 將spout傳遞過來的tuple值進行轉換
this.collector.emit(new Values(tuple.getStringByField("word") + "!!!"));
}
// 聲明發送訊息的欄位名
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}