storm和kafka的wordCount
阿新 • • 發佈:2018-06-26
onf scope work exe 本地 values 編寫代碼 ase shuffle
這個是在window環境下面安裝的kafka
下載pom依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> <!-- 本地測試註釋集群運行打開 --> <!-- <scope>provided</scope>--> </dependency>
編寫代碼
編寫SplitSentenceBolt
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;@Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector=outputCollector; } @Override public void execute(Tuple tuple) { //String sentece = tuple.getStringByField("sentence"); String sentece=tuple.getString(4); String[] words = sentece.split(" "); for (String word:words){ collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("words")); }
}
編寫WordCountBolt
public class WordCountBolt extends BaseRichBolt {
}private OutputCollector collector; private HashMap<String,Long> counts =null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap<>(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("words"); // String word =tuple.getString(0); Long count=this.counts.get(word); if(count==null){ count=0L; } count++; //出現就添加到map中,word相同的,會覆蓋掉 所以最後的word就是準確的數據 this.counts.put(word,count); this.collector.emit(new Values(word,count)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word","count")); }
編寫ReportBolt
public class ReportBolt extends BaseRichBolt {
private HashMap
}@Override public void execute(Tuple input) { String word=input.getStringByField("word"); Long count=input.getLongByField("count"); this.counts.put(word, count); System.out.println("--------FINAL COUNTS--------"); List<String> keys=new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key:keys){ System.out.println(key+":"+this.counts.get(key)); } System.out.println("----------------------------"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
編寫Topology
public class MainTopology {
public static void main(String[] args)throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder//設置kafka屬於哪個組 kafkabuilder.setGroupId("testgroup"); //創建kafkaspoutConfig KafkaSpoutConfig<String, String> build = kafkabuilder.build(); //通過kafkaspoutconfig獲取kafkaspout KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build); //設置四個線程接收數據 builder.setSpout("kafkaspout",kafkaSpout,4);
// builder.setBolt("printBolt", new PrintBolt(),2).localOrShuffleGrouping("kafkaspout");
builder.setBolt("split-bolt",new SplitSentenceBolt(),2).setNumTasks(4).shuffleGrouping("kafkaspout"); // 有時候我們需要將特定數據的tuple路由到特殊的bolt實例中,在此我們使用fieldsGrouping // 來保證所有"word"字段值相同的tuple會被路由到同一個WordCountBolt實例中 builder.setBolt("count-bolt",new WordCountBolt(),2).fieldsGrouping("split-bolt",new Fields("words")); builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt"); Config config=new Config(); config.setDebug(false); config.setNumWorkers(2); LocalCluster cluster =new LocalCluster(); cluster.submitTopology("kafkaspout",config,builder.createTopology()); }
storm和kafka的wordCount