Storm案例之詞頻統計
阿新 • • 發佈:2018-11-14
1.案例需求
在本地模式下使用Storm實現統計指定檔案中的詞頻個數統計
2.需求分析
Spout來讀取指定檔案的資料,並把每一行資料傳送出去
Bolt來實現具體邏輯,單詞分割和統計
將結果輸出到控制檯
Spout——>Bolt——>Bolt
3.匯入Storm的依賴,在上一篇求和案例中有這個依賴,這裡就不再重複了
4.具體程式碼
package cn.ysjh; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.io.File; import java.io.IOException; import java.util.*; public class WordcountStorm { private static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector=spoutOutputCollector; } /* 讀取指定資料夾下的資料:D:\測試資料 把每一行資料發射出去 */ @Override public void nextTuple() { //獲取檔案,txt是指只讀取指定資料夾下的txt字尾的檔案,true是指是否支援遞迴 Collection<File> files = FileUtils.listFiles(new File("D:\\測試資料\\storm"), new String[]{"txt"}, true); for(File file:files){ try { //獲取檔案中的內容 List<String> lines = FileUtils.readLines(file); //獲取檔案中的每行內容,並將它發射出去 for(String line:lines){ this.spoutOutputCollector.emit(new Values(line)); } //資料讀取完畢之後改變檔名,否則會一直執行 FileUtils.moveFile(file, new File(file.getAbsolutePath() + System.currentTimeMillis())); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("lines")); } } /* 詞頻分割Bolt */ private static class SplitBolt extends BaseRichBolt{ private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } /* 對lines按照逗號進行切分 */ @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField("lines"); String[] split = lines.split(","); //將資料發射到下一個Bolt for (String word:split){ this.outputCollector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("words")); } } /* 詞頻統計Bolt */ private static class CountBolt extends BaseRichBolt{ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } Map<String,Integer> map=new HashMap<>(); @Override public void execute(Tuple tuple) { String words = tuple.getStringByField("words"); Integer count = map.get(words); if(count==null){ count=0; } count++; map.put(words,count); System.out.println("---------------"); Set<Map.Entry<String, Integer>> set = map.entrySet(); for (Map.Entry<String, Integer> entry:set){ System.out.println(entry); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout",new DataSourceSpout()); builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordcountStorm",new Config(),builder.createTopology()); } }
5.執行截圖
可以看出,Storm是一行一行的讀取資料的,讀取一行統計一行並進行累加
注意: 檔案中的資料讀取完之後會將檔名改變,否則會一直重複讀取該檔案
前面的求和和這裡的詞頻統計案例都是在本地模式下執行的,如果想要提交到叢集上進行執行測試,需要將程式碼中的LocalCluster修改為StormSubmitter,但是最好的方式是在程式碼中寫入一個if判斷,如果是本地模式就使用LocalCluster,叢集模式就使用StormSubmitter,這裡感興趣的可以試一試
如果要在叢集上執行,使用
storm jar jar包目錄 jar包中的類名
來執行Storm任務