Storm-wordcount實時統計單詞次數
阿新 • • 發佈:2018-12-28
一、本地模式
1、WordCountSpout類
package com.demo.wc; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values; /** * 需求:單詞計數 hello world hello Beijing China * * 實現介面: IRichSpout IRichBolt * 繼承抽象類:BaseRichSpout BaseRichBolt 常用*/ public class WordCountSpout extends BaseRichSpout { //定義收集器 private SpoutOutputCollector collector; //傳送資料 @Overridepublic void nextTuple() { //1.傳送資料 到bolt collector.emit(new Values("I like China very much")); //2.設定延遲 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } //建立收集器 @Override publicvoid open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; } //宣告描述 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //起別名 declarer.declare(new Fields("wordcount")); } }
2、WordCountSplitBolt類
package com.demo.wc; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class WordCountSplitBolt extends BaseRichBolt { //資料繼續傳送到下一個bolt private OutputCollector collector; //業務邏輯 @Override public void execute(Tuple in) { //1.獲取資料 String line = in.getStringByField("wordcount"); //2.切分資料 String[] fields = line.split(" "); //3.<單詞,1> 傳送出去 下一個bolt(累加求和) for (String w : fields) { collector.emit(new Values(w, 1)); } } //初始化 @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector = collector; } //宣告描述 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "sum")); } }
3、WordCountBolt類
package com.demo.wc; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class WordCountBolt extends BaseRichBolt{ private Map<String, Integer> map = new HashMap<>(); //累加求和 @Override public void execute(Tuple in) { //1.獲取資料 String word = in.getStringByField("word"); Integer sum = in.getIntegerByField("sum"); //2.業務處理 if (map.containsKey(word)) { //之前出現幾次 Integer count = map.get(word); //已有的 map.put(word, count + sum); } else { map.put(word, sum); } //3.列印控制檯 System.out.println(Thread.currentThread().getName() + "\t 單詞為:" + word + "\t 當前已出現次數為:" + map.get(word)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } }
4、WordCountDriver類
package com.demo.wc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountDriver { public static void main(String[] args) { //1.hadoop->Job storm->topology 建立拓撲 TopologyBuilder builder = new TopologyBuilder(); //2.指定設定 builder.setSpout("WordCountSpout", new WordCountSpout(), 1); builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("wordcount")); builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word")); //3.建立配置資訊 Config conf = new Config(); //4.提交任務 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordcounttopology", conf, builder.createTopology()); } }
5、直接執行(4)裡面的main方法即可啟動本地模式。
二、叢集模式
前三個類和上面本地模式一樣,第4個類WordCountDriver和本地模式有點區別
package com.demo.wc; import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountDriver { public static void main(String[] args) { //1.hadoop->Job storm->topology 建立拓撲 TopologyBuilder builder = new TopologyBuilder(); //2.指定設定 builder.setSpout("WordCountSpout", new WordCountSpout(), 1); builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("wordcount")); builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word")); //3.建立配置資訊 Config conf = new Config(); //conf.setNumWorkers(10); //叢集模式 try { StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } //4.提交任務 //LocalCluster localCluster = new LocalCluster(); //localCluster.submitTopology("wordcounttopology", conf, builder.createTopology()); } }
把程式打成jar包放在啟動了Storm叢集的機器裡,在stormwordcount.jar所在目錄下執行
storm jar stormwordcount.jar com.demo.wc.WordCountDriver wordcount01
即可啟動程式。
三、併發度和分組策略
1、WordCountDriver_Shuffle類
package com.demo.wc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; public class WordCountDriver_Shuffle { public static void main(String[] args) { //1.hadoop->Job storm->topology 建立拓撲 TopologyBuilder builder = new TopologyBuilder(); //2.指定設定 builder.setSpout("WordCountSpout", new WordCountSpout(), 2); builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 2).setNumTasks(4).shuffleGrouping("WordCountSpout"); builder.setBolt("WordCountBolt", new WordCountBolt(), 6).shuffleGrouping("WordCountSplitBolt"); //3.建立配置資訊 Config conf = new Config(); //conf.setNumWorkers(2); //叢集模式 // try { // StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); // } catch (Exception e) { // e.printStackTrace(); // } //4.提交任務 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordcounttopology", conf, builder.createTopology()); } }
2、併發度與分組策略