1. 程式人生 > >Storm案例之詞頻統計

Storm案例之詞頻統計

1.案例需求

在本地模式下使用Storm實現統計指定檔案中的詞頻個數統計

2.需求分析

Spout來讀取指定檔案的資料,並把每一行資料傳送出去

Bolt來實現具體邏輯,單詞分割和統計

將結果輸出到控制檯

Spout——>Bolt——>Bolt

3.匯入Storm的依賴,在上一篇求和案例中有這個依賴,這裡就不再重複了

4.具體程式碼

package cn.ysjh;


import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

public class WordcountStorm {

    private static class DataSourceSpout extends BaseRichSpout{

        private SpoutOutputCollector spoutOutputCollector;

        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
           this.spoutOutputCollector=spoutOutputCollector;
        }



        /*
        讀取指定資料夾下的資料:D:\測試資料
        把每一行資料發射出去
         */
        @Override
        public void nextTuple() {

            //獲取檔案,txt是指只讀取指定資料夾下的txt字尾的檔案,true是指是否支援遞迴
            Collection<File> files = FileUtils.listFiles(new File("D:\\測試資料\\storm"), new String[]{"txt"}, true);

            for(File file:files){
                try {
                    //獲取檔案中的內容
                    List<String> lines = FileUtils.readLines(file);

                    //獲取檔案中的每行內容,並將它發射出去
                    for(String line:lines){

                        this.spoutOutputCollector.emit(new Values(line));

                    }

                    //資料讀取完畢之後改變檔名,否則會一直執行
                    FileUtils.moveFile(file, new File(file.getAbsolutePath() + System.currentTimeMillis()));



                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
           outputFieldsDeclarer.declare(new Fields("lines"));
        }
    }



    /*
    詞頻分割Bolt
     */
    private static class SplitBolt extends BaseRichBolt{

        private OutputCollector outputCollector;

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
          this.outputCollector=outputCollector;
        }


        /*
        對lines按照逗號進行切分
         */
        @Override
        public void execute(Tuple tuple) {

            String lines = tuple.getStringByField("lines");
            String[] split = lines.split(",");

            //將資料發射到下一個Bolt
            for (String word:split){
                this.outputCollector.emit(new Values(word));
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
          outputFieldsDeclarer.declare(new Fields("words"));
        }
    }




    /*
    詞頻統計Bolt
     */
    private static class CountBolt extends BaseRichBolt{

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

       Map<String,Integer> map=new HashMap<>();

        @Override
        public void execute(Tuple tuple) {

            String words = tuple.getStringByField("words");
            Integer count = map.get(words);

            if(count==null){
                count=0;
            }
                count++;



            map.put(words,count);


            System.out.println("---------------");
            Set<Map.Entry<String, Integer>> set = map.entrySet();

            for (Map.Entry<String, Integer> entry:set){
               System.out.println(entry);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }




    public static void main(String[] args){

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");


        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("WordcountStorm",new Config(),builder.createTopology());

    }
}

5.執行截圖

可以看出,Storm是一行一行的讀取資料的,讀取一行統計一行並進行累加

注意:     檔案中的資料讀取完之後會將檔名改變,否則會一直重複讀取該檔案

 

前面的求和和這裡的詞頻統計案例都是在本地模式下執行的,如果想要提交到叢集上進行執行測試,需要將程式碼中的LocalCluster修改為StormSubmitter,但是最好的方式是在程式碼中寫入一個if判斷,如果是本地模式就使用LocalCluster,叢集模式就使用StormSubmitter,這裡感興趣的可以試一試

如果要在叢集上執行,使用

storm jar jar包目錄 jar包中的類名

來執行Storm任務