Storm作為新消費者對接Kafka 0.10.x+版本
阿新 • • 發佈:2019-01-28
Storm應用場景—作為新消費者對接Kafka 0.10.x+版本(一)
00 背景
隨著Kafka版本的升級,Storm作為消費者對接Kafka 0.10.x+版本的方式上,與之前存在差異,現將新的方式記錄下來,為以後遇到使用Storm實時處理新版Kafka資料的場景時提供參考。
01 架構簡介
架構如下圖所示。
使用Flume元件採集資料時,採用雙層架構,第一層的作用是採集,第二層的作用是聚合,這種架構能夠達到負載均衡的效果。第二層會將資料傳送到Kafka,Storm會實時從Kafka讀取資料,根據需求進行處理,然後將處理後的資料傳送到對應的儲存層。本篇重點關注Storm從Kafka讀取並處理資料。
02 演示
第一步:向採集檔案寫入資料
第二步:觀察Kafka topic情況
由以上說明,資料成功寫入Kafka。
第三步:執行Storm word count程式,檢視結果。
03程式碼
Storm處理資料流程如下圖所示。
KafkaWordCountTopology部分
package com.ccc.storm.learning.topology.kafka;
import com.ccc.storm.learning.bolt.kafka.KafkaOutputBolt;
import com.ccc.storm.learning.bolt.kafka.KafkaSplitSentenceBolt;
import com.ccc.storm.learning.bolt.kafka.KafkaWordCountBolt;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology. TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* @author ccc
*/
public class KafkaWordCountTopology {
private static final String TOPICS = "word-count";
private static final String KEYS = "word";
private static final String BOOTSTRAP_SERVERS = "master:9092,slave:9092,slave3:9092";
private static final String KAFKA_WORD_COUT_SPOUT_ID = "KafkaWordCountSpout";
private static final String SPLIT_SENTENCE_BOLT_ID = "SplitSentenceBolt";
private static final String KAFKA_WORD_COUNT_BOLT_ID = "KafkaWordCountBolt";
private static final String KAFKA_OUTPUT_BOLT_ID = "KafkaOutputBolt";
public static void main(String[] args) throws Exception {
KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPICS)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setTupleTrackingEnforced(true)
.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout(KAFKA_WORD_COUT_SPOUT_ID, kafkaSpout, 2);
tp.setBolt(SPLIT_SENTENCE_BOLT_ID, new KafkaSplitSentenceBolt(), 2)
.setNumTasks(2)
.shuffleGrouping(KAFKA_WORD_COUT_SPOUT_ID);
tp.setBolt(KAFKA_WORD_COUNT_BOLT_ID, new KafkaWordCountBolt(), 4)
.setNumTasks(2)
.fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields(KEYS));
tp.setBolt(KAFKA_OUTPUT_BOLT_ID, new KafkaOutputBolt(), 2)
.setNumTasks(2)
.globalGrouping(KAFKA_WORD_COUNT_BOLT_ID);
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
// 提交叢集
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, tp.createTopology());
} else {
// 本地測試
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-word-count-topology", conf,
tp.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
KafkaSplitSentenceBolt部分
package com.ccc.storm.learning.bolt.kafka;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* @author ccc
*/
public class KafkaSplitSentenceBolt implements IBasicBolt {
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String poetry = tuple.getStringByField("value");
List<String> sentences = Arrays.asList(poetry.split(","));
sentences.forEach(sentence -> {
List<String> words = Arrays.asList(sentence.replace("。", "").split(""));
words.forEach(word -> collector.emit(new Values(word)));
});
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
KafkaWordCountBolt部分
package com.ccc.storm.learning.bolt.kafka;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* @author ccc
*/
public class KafkaWordCountBolt implements IBasicBolt {
private Map<String, Integer> wordCounts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
Integer counts = wordCounts.get(word);
if (counts == null) {
counts = 0;
}
counts++;
wordCounts.put(word, counts);
collector.emit(new Values(word, counts));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "counts"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
KafkaOutputBolt部分
package com.ccc.storm.learning.bolt.kafka;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.*;
/**
* @author ccc
*/
public class KafkaOutputBolt implements IBasicBolt {
private Map<String, Integer> wordCounts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
Integer counts = tuple.getIntegerByField("counts");
this.wordCounts.put(word, counts);
}
@Override
public void cleanup() {
Set<String> keySet = wordCounts.keySet();
List<String> keyList = new ArrayList<>();
keyList.addAll(keySet);
Collections.sort(keyList);
keyList.forEach(key -> System.out.println(key + "->" + wordCounts.get(key)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
程式碼大體框架如上面所示,在實際開發過程中可以對其進行優化。
更多內容請關注公眾號:大資料開發與學習茶館