Storm之路-WordCount-實例
初學storm,有不足的地方還請糾正。
網上看了很多wordcount實例,發現都不是我想要的。
實現場景:統計shengjing.txt詞頻到集合,一次打印結果。
● 消息源Spout
繼承BaseRichSpout類 / 實現IRichSpout接口
open,初始化動作;
nextTuple,消息接入,執行數據發射;
ack,tuple成功處理後調用;
fail,tuple處理失敗後調用;
declareOutputFields,聲明輸出字段;
● 處理單元Bolt
繼承BaseBasicBolt類 / BaseWindowedBolt / 實現IRichBolt接口
prepare,worker啟動時初始化;
execute,接受一個tuple / tupleWindow並執行邏輯處理,發射出去;
cleanup,關閉前調用;
declareOutputFiedls,字段申明;
● 項目結構
● pom.xml文件,配置項目jar依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.scps.storm</groupId> <artifactId>storm-example</artifactId> <version>0.0.1</version> <name>storm.example</name> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>
● WordTopology.java文件,入口類,實例Topology、Spout、Bolt,配置等
1 package com.scps.storm.helloword; 2 3 import java.util.concurrent.TimeUnit; 4 5 import org.apache.storm.Config; 6 import org.apache.storm.LocalCluster; 7 import org.apache.storm.StormSubmitter; 8 import org.apache.storm.generated.AlreadyAliveException; 9 import org.apache.storm.generated.AuthorizationException; 10 import org.apache.storm.generated.InvalidTopologyException; 11 import org.apache.storm.topology.TopologyBuilder; 12 import org.apache.storm.topology.base.BaseWindowedBolt.Duration; 13 import org.apache.storm.tuple.Fields; 14 15 import com.scps.storm.helloword.bolt.SlidingWindowBolt; 16 import com.scps.storm.helloword.bolt.WordCountBolt; 17 import com.scps.storm.helloword.bolt.WordFinalBolt; 18 import com.scps.storm.helloword.bolt.WordSplitBolt; 19 import com.scps.storm.helloword.spout.WordReaderSpout; 20 21 public class WordTopology { 22 23 public static void main(String[] args) { 24 25 TopologyBuilder builder = new TopologyBuilder(); 26 27 // 1個task去讀文件 28 builder.setSpout("word-reader", new WordReaderSpout(), 1); 29 30 // 2個task分割行 31 builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader"); 32 33 // 2個task分批統計,並發送相同的word到同一個task 34 builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word")); 35 36 // 1個task匯總,每隔3秒統計最近5秒的tuple,SlidingWindow滑動窗口(間隔) 37 // builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); 38 // 1個task匯總,統計5秒內的tuple,不能超過15秒?提示超時錯誤,TumblingWindow滾動窗口 39 builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); 40 41 // 1個task輸出 42 builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt"); 43 44 Config conf = new Config(); 45 46 conf.setDebug(false); 47 48 if (args != null && args.length > 0) { 49 50 // 在集群運行,需要mvn package編譯 51 // bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount 52 53 try { 54 55 String file = args[0]; 56 String name = args[1]; 57 58 conf.put("file", file); 59 // conf.setNumWorkers(2); 60 61 StormSubmitter.submitTopology(name, conf, builder.createTopology()); 62 63 } catch (AlreadyAliveException e) { 64 65 e.printStackTrace(); 66 67 } catch (InvalidTopologyException e) { 68 69 e.printStackTrace(); 70 71 } catch (AuthorizationException e) { 72 73 e.printStackTrace(); 74 } 75 76 } else { 77 78 // 直接在eclipse中運行 79 80 conf.put("file", "C:\\Users\\Administrator\\Downloads\\shengjing1.txt"); 81 // conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt"); 82 // conf.setMaxTaskParallelism(2); // 設置最大task數 83 LocalCluster cluster = new LocalCluster(); 84 cluster.submitTopology("wordcount", conf, builder.createTopology()); 85 } 86 } 87 }View Code
● WordReaderSpout.java文件,讀取txt文件,發送行
1 package com.scps.storm.helloword.spout; 2 3 import java.io.BufferedReader; 4 import java.io.FileInputStream; 5 import java.io.FileNotFoundException; 6 import java.io.IOException; 7 import java.io.InputStream; 8 import java.io.InputStreamReader; 9 import java.io.UnsupportedEncodingException; 10 import java.net.MalformedURLException; 11 import java.net.URL; 12 import java.net.URLConnection; 13 import java.text.SimpleDateFormat; 14 import java.util.Date; 15 import java.util.Map; 16 17 import org.apache.storm.spout.SpoutOutputCollector; 18 import org.apache.storm.task.TopologyContext; 19 import org.apache.storm.topology.IRichSpout; 20 import org.apache.storm.topology.OutputFieldsDeclarer; 21 import org.apache.storm.tuple.Fields; 22 import org.apache.storm.tuple.Values; 23 import org.apache.storm.utils.Utils; 24 25 public class WordReaderSpout implements IRichSpout { 26 27 private static final long serialVersionUID = 1L; 28 private SpoutOutputCollector outputCollector; 29 private String filePath; 30 private boolean completed = false; 31 32 public void ack(Object arg0) { 33 34 } 35 36 public void activate() { 37 38 } 39 40 public void close() { 41 42 } 43 44 public void deactivate() { 45 46 } 47 48 public void fail(Object arg0) { 49 50 } 51 52 @SuppressWarnings("rawtypes") 53 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 54 55 filePath = conf.get("file").toString(); 56 outputCollector = collector; 57 } 58 59 public void nextTuple() { 60 61 if (!completed) { 62 63 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); 64 System.out.println("WordReaderSpout nextTuple, " + time); 65 66 String line = ""; 67 InputStream inputStream = null; 68 InputStreamReader inputStreamReader = null; 69 BufferedReader reader = null; 70 71 try { 72 73 // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt"; 74 // filePath = "C:\\Users\\Administrator\\Downloads\\shengjing.txt"; 75 76 if (filePath.startsWith("http://")) { // 遠程文件 77 URL url = new URL(filePath); 78 URLConnection urlConn = url.openConnection(); 79 inputStream = urlConn.getInputStream(); 80 } else { // 本地文件 81 inputStream = new FileInputStream(filePath); 82 } 83 84 inputStreamReader = new InputStreamReader(inputStream, "utf-8"); 85 reader = new BufferedReader(inputStreamReader); 86 while ((line = reader.readLine()) != null) { 87 outputCollector.emit(new Values(line)); 88 } 89 90 } catch (MalformedURLException e) { 91 e.printStackTrace(); 92 } catch (FileNotFoundException e) { 93 e.printStackTrace(); 94 } catch (UnsupportedEncodingException e) { 95 e.printStackTrace(); 96 } catch (IOException e) { 97 e.printStackTrace(); 98 } finally { 99 completed = true; 100 try { 101 if (reader != null) { 102 reader.close(); 103 } 104 if (inputStreamReader != null) { 105 inputStreamReader.close(); 106 } 107 if (inputStream != null) { 108 inputStream.close(); 109 } 110 } catch (IOException e) { 111 e.printStackTrace(); 112 } 113 } 114 } 115 116 Utils.sleep(20000); 117 } 118 119 public void declareOutputFields(OutputFieldsDeclarer declarer) { 120 121 declarer.declare(new Fields("line")); 122 } 123 124 public Map<String, Object> getComponentConfiguration() { 125 126 return null; 127 } 128 }View Code
使用集群測試時,先把txt文件上傳到nimbus的ui裏,隨機指派supervisor遠程讀取文件。
● WordSplitBolt.java文件,接收行,分割行,發送詞
1 package com.scps.storm.helloword.bolt; 2 3 import java.util.Map; 4 5 import org.apache.storm.task.OutputCollector; 6 import org.apache.storm.task.TopologyContext; 7 import org.apache.storm.topology.IRichBolt; 8 import org.apache.storm.topology.OutputFieldsDeclarer; 9 import org.apache.storm.tuple.Fields; 10 import org.apache.storm.tuple.Tuple; 11 import org.apache.storm.tuple.Values; 12 13 public class WordSplitBolt implements IRichBolt { 14 15 private static final long serialVersionUID = 1L; 16 private OutputCollector outputCollector; 17 18 @SuppressWarnings("rawtypes") 19 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 20 21 outputCollector = collector; 22 } 23 24 public void execute(Tuple input) { 25 26 String line = input.getStringByField("line"); 27 28 line = line.trim(); 29 line = line.replace(",", " "); 30 line = line.replace(".", " "); 31 line = line.replace(":", " "); 32 line = line.replace(";", " "); 33 line = line.replace("?", " "); 34 line = line.replace("!", " "); 35 line = line.replace("(", " "); 36 line = line.replace(")", " "); 37 line = line.replace("[", " "); 38 line = line.replace("]", " "); 39 line = line.trim(); 40 41 String[] words = line.split(" "); 42 for (String word : words) { 43 word = word.trim(); 44 if (!"".equals(word)) { 45 outputCollector.emit(new Values(word)); 46 } 47 } 48 } 49 50 public void declareOutputFields(OutputFieldsDeclarer declarer) { 51 52 declarer.declare(new Fields("word")); 53 } 54 55 public void cleanup() { 56 57 } 58 59 public Map<String, Object> getComponentConfiguration() { 60 61 return null; 62 } 63 }View Code
● WordCountBolt.java文件,接收詞,統計詞,發送集合
1 package com.scps.storm.helloword.bolt; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.storm.task.OutputCollector; 7 import org.apache.storm.task.TopologyContext; 8 import org.apache.storm.topology.IRichBolt; 9 import org.apache.storm.topology.OutputFieldsDeclarer; 10 import org.apache.storm.tuple.Fields; 11 import org.apache.storm.tuple.Tuple; 12 import org.apache.storm.tuple.Values; 13 14 public class WordCountBolt implements IRichBolt { 15 16 private static final long serialVersionUID = 1L; 17 Map<String, Integer> counter; 18 private OutputCollector outputCollector; 19 20 @SuppressWarnings("rawtypes") 21 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 22 23 counter = new HashMap<String, Integer>(); 24 outputCollector = collector; 25 } 26 27 public void execute(Tuple input) { 28 29 String word = input.getStringByField("word"); 30 int count; 31 32 if (!counter.containsKey(word)) { 33 count = 1; 34 } else { 35 count = counter.get(word) + 1; 36 } 37 38 counter.put(word, count); 39 outputCollector.emit(new Values(word, count)); 40 } 41 42 public void declareOutputFields(OutputFieldsDeclarer declarer) { 43 44 declarer.declare(new Fields("word", "count")); 45 } 46 47 public void cleanup() { 48 49 } 50 51 public Map<String, Object> getComponentConfiguration() { 52 53 return null; 54 } 55 }View Code
● SlidingWindowBolt.java文件,接收集合,合並集合,發送集合
1 package com.scps.storm.helloword.bolt; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.storm.task.OutputCollector; 9 import org.apache.storm.task.TopologyContext; 10 import org.apache.storm.topology.OutputFieldsDeclarer; 11 import org.apache.storm.topology.base.BaseWindowedBolt; 12 import org.apache.storm.tuple.Fields; 13 import org.apache.storm.tuple.Tuple; 14 import org.apache.storm.tuple.Values; 15 import org.apache.storm.windowing.TupleWindow; 16 17 public class SlidingWindowBolt extends BaseWindowedBolt { 18 19 private static final long serialVersionUID = 1L; 20 Map<String, Integer> counter; 21 private OutputCollector outputCollector; 22 23 @SuppressWarnings("rawtypes") 24 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 25 26 counter = new HashMap<String, Integer>(); 27 outputCollector = collector; 28 } 29 30 public void execute(TupleWindow inputWindow) { 31 32 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); 33 System.out.println("SlidingWindowBolt execute, " + time); 34 35 for (Tuple input : inputWindow.get()) { 36 37 String word = input.getStringByField("word"); 38 int count = input.getIntegerByField("count"); 39 40 counter.put(word, count); 41 } 42 43 outputCollector.emit(new Values(counter)); 44 } 45 46 public void declareOutputFields(OutputFieldsDeclarer declarer) { 47 48 declarer.declare(new Fields("counter")); 49 } 50 }View Code
● WordFinalBolt.java文件,接收集合,打印集合
1 package com.scps.storm.helloword.bolt; 2 3 import java.text.SimpleDateFormat; 4 import java.util.ArrayList; 5 import java.util.Collections; 6 import java.util.Date; 7 import java.util.List; 8 import java.util.Map; 9 10 import org.apache.storm.task.OutputCollector; 11 import org.apache.storm.task.TopologyContext; 12 import org.apache.storm.topology.IRichBolt; 13 import org.apache.storm.topology.OutputFieldsDeclarer; 14 import org.apache.storm.tuple.Tuple; 15 16 public class WordFinalBolt implements IRichBolt { 17 18 private static final long serialVersionUID = 1L; 19 20 @SuppressWarnings("rawtypes") 21 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 22 23 } 24 25 @SuppressWarnings("unchecked") 26 public void execute(Tuple input) { 27 28 Map<String, Integer> counter = (Map<String, Integer>) input.getValueByField("counter"); 29 List<String> keys = new ArrayList<String>(); 30 keys.addAll(counter.keySet()); 31 Collections.sort(keys); 32 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); 33 System.out.println("-----------------begin------------------, " + time); 34 for (String key : keys) { 35 System.out.println(key + " : " + counter.get(key)); 36 } 37 System.out.println("-----------------end--------------------, " + time); 38 } 39 40 public void cleanup() { 41 42 } 43 44 public void declareOutputFields(OutputFieldsDeclarer declarer) { 45 46 } 47 48 public Map<String, Object> getComponentConfiguration() { 49 50 return null; 51 } 52 }View Code
● 項目源碼文件地址:https://pan.baidu.com/s/1mhZtvq4 密碼:ypbc
Storm之路-WordCount-實例