使用Storm進行詞頻統計
阿新 • • 發佈:2018-10-31
date 業務 out eric args private shuf err base
詞頻統計
1.需求:讀取指定目錄的數據,並且實現單詞計數功能
2.實現方案:
Spout用於讀取指定文件夾(目錄),讀取文件,將文件的每一行發射到Bolt
SplitBolt用於接收Spout發射過來的數據,並拆分,發射到CountBolt
CountBolt接收SplitBolt發送的每一個單詞,進行單詞計數操作
3.拓撲設計:
DataSourceSpout + SplitBolt + CountBolt
代碼如下:
package com.csylh; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.io.File; import java.io.IOException; import java.util.*; /** * Description:使用Storm完成詞頻統計功能 * * @author: 留歌36 * Date:2018/9/4 9:28 */ public class LocalWordCountStormTopology { /** * 讀取數據並發送到Bolt上去 */ public static class DataSourceSpout extends BaseRichSpout{ //定義一個發射器 private SpoutOutputCollector collector; /** * 初始化方法 只是會被調用一次 * @param conf 配置參數 * @param context 上下文 * @param collector 數據發射器 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //對上面定義的的發射器進行賦初值 this.collector = collector; } /** * 用於數據的產生 * 業務: * 1.讀取指定目錄的文件夾下的數據 * 2.把每一行數據發射出去 */ @Override public void nextTuple() { // 獲取所有文件,這裏指定文件的後綴 Collection<File> files = FileUtils.listFiles(new File("E:\\StormText"),new String[]{"txt"},true); // 循環遍歷每一個文件 ==> 由於這裏指定的是文件夾下面的目錄 所以就是需要進行循環遍歷 for( File file : files){ try { // 獲取每一個文件的每一行 List<String> lines = FileUtils.readLines(file); for(String line : lines){ // 把每一行數據發射出去 this.collector.emit(new Values(line)); } //TODO 數據處理完畢之後 改名 否則的話 會一直執行的 FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis())); } catch (IOException e) { e.printStackTrace(); } } } /** * 聲明輸出字段名稱 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } /** * 對Spout發送過來的數據進行分割 */ public static class SplitBolt extends BaseRichBolt{ private OutputCollector collector; /** * 初始化方法 只是會被執行一次 * @param stormConf * @param context * @param collector Bolt的發射器,指定下一個Bolt的地址 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 用於獲取Spout發送過來的數據 * 業務邏輯 * spout發送過來的數據是一行一行的line * 這裏是需要line進行分割 * * @param input */ @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split(","); for(String word : words){ // 這裏把每一個單詞發射出去 this.collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } /** * 詞頻匯總的Bolt */ public static class CountBolt extends BaseRichBolt{ /** * 由於這裏是不需要向外部發射 所以就不需要定義Collector * @param stormConf * @param context * @param collector */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } Map<String,Integer> map = new HashMap<String, Integer>(); /** * 業務邏輯 * 1.獲取每一個單詞 * 2.對每一個單詞進行匯總 * 3.輸出結果 * @param input */ @Override public void execute(Tuple input) { // 獲取每一個單詞 String word = input.getStringByField("word"); Integer count = map.get(word); if (count == null){ count = 0; } count++; // 對單詞進行匯總 map.put(word,count); // 輸出 System.out.println("~~~~~~~~~~~~~~~~~~~~~~~"); Set<Map.Entry<String,Integer>> entrySet = map.entrySet(); for(Map.Entry<String,Integer> entry :entrySet){ System.out.println(entry); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } /** * 主函數 * @param args */ public static void main(String[] args) { // 使用TopologyBuilder根據Spout和Bolt構建Topology TopologyBuilder builder = new TopologyBuilder(); // 設置Bolt和Spout 設置Spout和Bolt的關聯關系 builder.setSpout("DataSourceSpout",new DataSourceSpout()); builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout"); builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt"); // 創建一個本地的集群 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology()); } }
小結:開發Storm程序的步驟就是:
根據需求 設計實現方案 規劃拓撲
一般是先寫Spout數據產生器 發射數據到Bolt
接著,就是Bolt進行數據處理,如果有多個Bolt,非最後一個Bolt也要寫發射器Collector
最後一個Bolt直接輸出結果或者 輸出到HDFS或者關系型數據庫中
最終需要將Spout和Bolt進行組裝起來(借助TopologyBuilder)
使用Storm進行詞頻統計