1. 程式人生 > >Storm實現單詞統計案例

Storm實現單詞統計案例

  1. 需求

    實時統計發射到Storm框架中單詞的總數

  2. 分析

    設計一個topology,來實現對文件裡面的單詞出現的頻率進行統計,整個topology分為三個部分

    (1)WordCountSpot:資料來源,在已知的英文句子中,隨機發送一條句子出去

    package storm.wordcount;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; //傳送一條語句 public class WordCountSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf,
    TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { //傳送資料 collector.emit(new Values("shnad zhang1 zhsndga1 dasd a a b b c dd d dd")); //延時0.5 s try { Thread.sleep
    (500); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("love")); } }

    (2)WordCountSplitBolt:負責將單行文字記錄(句子),切分成單詞

    package storm.wordcount;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    public class WordCountSplitBolt extends BaseRichBolt{
        private OutputCollector collector;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            this.collector = collector;
        }
    
        @Override
        //接受資料
        public void execute(Tuple input) {
    
            //1. 獲取資料
            String line = input.getString(0);
    
            //2 擷取資料
            String[] splits = line.split(" ");
    
            //3 傳送出去
            for (String word : splits) {
                collector.emit(new Values(word,1));
            }
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //宣告欄位
            declarer.declare(new Fields("word", "num"));
        }
    }
    

    (3)WordCountBolt:負責對單詞的頻率進行累加

    package storm.wordcount;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class WordCountBolt extends BaseRichBolt {
        //單詞為key,單詞出現的次數為value
        private Map<String, Integer> map = new HashMap<>();
        private OutputCollector collector;
    
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
    
        @Override
        public void execute(Tuple input) {
            //1 獲取傳遞過來的資料
            String word = input.getString(0);
            Integer num = input.getInteger(1);
    
            //2 業務邏輯
            if (map.containsKey(word)) {
                //如果之前統計過有單詞的個數,獲取個數
                Integer count = map.get(word);
    
                map.put(word, count + num);
            } else {
                map.put(word, num);
            }
    
            // 3 控制檯列印
            System.err.println(Thread.currentThread().getId() + " word : " + word + " num: " + map.get(word));
    
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
    

    (4)WordCountMain驅動

    ​ 1.建立拓撲物件 2.設定spout 3.配置worker開啟個數 4. 提交

    package storm.wordcount;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    
    public class WordCountMain  {
        public static void main (String[] args){
            //1 建立拓撲
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("WordCountSpout",new WordCountSpout(),1);
            builder.setBolt("WordCountSplitBolt",new WordCountSplitBolt(),2).shuffleGrouping("WordCountSpout");
    
    
            builder.setBolt("WordCountBolt",new WordCountBolt(),4).fieldsGrouping("WordCountSplitBolt",new Fields("word"));
    
            //2 建立配置資訊
            Config conf = new Config();
            conf.setNumWorkers(2);
    
            //3 提交
            if (args.length > 0){
                try {
                    StormSubmitter.submitTopology(args[0],conf,builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordtopology",conf,builder.createTopology());
            }
        }
    }