Storm實現單詞統計案例
阿新 • • 發佈:2018-12-17
-
需求
實時統計發射到Storm框架中單詞的總數
-
分析
設計一個topology,來實現對文件裡面的單詞出現的頻率進行統計,整個topology分為三個部分
(1)WordCountSpot:資料來源,在已知的英文句子中,隨機發送一條句子出去
package storm.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer;
(2)WordCountSplitBolt:負責將單行文字記錄(句子),切分成單詞
package storm.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; public class WordCountSplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override //接受資料 public void execute(Tuple input) { //1. 獲取資料 String line = input.getString(0); //2 擷取資料 String[] splits = line.split(" "); //3 傳送出去 for (String word : splits) { collector.emit(new Values(word,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //宣告欄位 declarer.declare(new Fields("word", "num")); } }
(3)WordCountBolt:負責對單詞的頻率進行累加
package storm.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; public class WordCountBolt extends BaseRichBolt { //單詞為key,單詞出現的次數為value private Map<String, Integer> map = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { //1 獲取傳遞過來的資料 String word = input.getString(0); Integer num = input.getInteger(1); //2 業務邏輯 if (map.containsKey(word)) { //如果之前統計過有單詞的個數,獲取個數 Integer count = map.get(word); map.put(word, count + num); } else { map.put(word, num); } // 3 控制檯列印 System.err.println(Thread.currentThread().getId() + " word : " + word + " num: " + map.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
(4)WordCountMain驅動
1.建立拓撲物件 2.設定spout 3.配置worker開啟個數 4. 提交
package storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountMain { public static void main (String[] args){ //1 建立拓撲 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("WordCountSpout",new WordCountSpout(),1); builder.setBolt("WordCountSplitBolt",new WordCountSplitBolt(),2).shuffleGrouping("WordCountSpout"); builder.setBolt("WordCountBolt",new WordCountBolt(),4).fieldsGrouping("WordCountSplitBolt",new Fields("word")); //2 建立配置資訊 Config conf = new Config(); conf.setNumWorkers(2); //3 提交 if (args.length > 0){ try { StormSubmitter.submitTopology(args[0],conf,builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordtopology",conf,builder.createTopology()); } } }