Storm學習筆記(1)Hello WordCount - 單機模式
古人雲,紙上得來終覺淺,絕知此事要躬行。翻譯過來,就是學東西哪有不踩坑的。
因為工作原因要折騰Storm,環境和第一個例子折騰了好久,搞完了回頭看,吐血的簡單。
Storm有兩種模式,單機和集群。入門當然選單機。
1、安裝JDK,配置Eclipse環境
2、建立一個Maven工程,在pom.xml加上這段:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.2</version>
<scope>compile</scope>
</dependency>3、通過Maven建立項目和下載依賴包。
其實,所需要的storm-core-1.1.2.jar可以從官網下載的storm包裏面的lib目錄中找到。
Java在下不熟悉,也就不多說了。
4、參考官方或者各種教程的word-count例子編個代碼。
5、在Eclipse裏面run起來就可以了。
什麽Storm, Zookeeper,其實在這個單機入門例子裏面,都是不需要的!
就這麽簡單。
具體代碼來說,官方提供的storm-starter例子中,WordCountTopology.java挺適合入門的。
只是裏面有個坑:
官方采用了python作為句子分割blot的實現,但是如果環境不具備的話,一跑就會出錯。
就是這段:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}// 其余部分略
可以用這個類來替代:
1 public static class SplitSentence extends BaseBasicBolt{ 2 @OverrideView Code3 public void execute(Tuple tuple, BasicOutputCollector collector){ 4 // 接收到一個句子 5 String sentence = tuple.getString(0); 6 // 把句子切割為單詞 7 StringTokenizer iter = new StringTokenizer(sentence); 8 // 發送每一個單詞 9 while(iter.hasMoreElements()){ 10 collector.emit(new Values(iter.nextToken())); 11 } 12 } 13 14 @Override 15 public void declareOutputFields(OutputFieldsDeclarer declarer){ 16 // 定義一個字段 17 declarer.declare(new Fields("word")); 18 } 19 20 @Override 21 public Map<String, Object> getComponentConfiguration() { 22 return null; 23 } 24 }
Run起來以後,在Eclipse的Console窗口裏面可以看到運行的詳情。
完整代碼如下:
1 package storm.blueprints; 2 3 import org.apache.storm.spout.SpoutOutputCollector; 4 import org.apache.storm.task.TopologyContext; 5 import org.apache.storm.topology.OutputFieldsDeclarer; 6 import org.apache.storm.topology.base.BaseRichSpout; 7 import org.apache.storm.tuple.Fields; 8 import org.apache.storm.tuple.Values; 9 10 import org.apache.storm.utils.Utils; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13 14 import org.apache.storm.Config; 15 import org.apache.storm.LocalCluster; 16 import org.apache.storm.StormSubmitter; 17 import org.apache.storm.task.ShellBolt; 18 19 import org.apache.storm.topology.BasicOutputCollector; 20 import org.apache.storm.topology.IRichBolt; 21 import org.apache.storm.topology.TopologyBuilder; 22 import org.apache.storm.topology.base.BaseBasicBolt; 23 24 import org.apache.storm.tuple.Tuple; 25 import java.util.HashMap; 26 import java.util.Map; 27 28 29 import java.util.*; 30 31 public class HelloWordCount 32 { 33 public static class RandomSentenceSpout extends BaseRichSpout { 34 private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); 35 36 SpoutOutputCollector _collector; 37 Random _rand; 38 39 40 @Override 41 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 42 _collector = collector; 43 _rand = new Random(); 44 } 45 46 @Override 47 public void nextTuple() { 48 Utils.waitForMillis(100);//(100); 49 String[] sentences = new String[]{ 50 sentence("the cow jumped over the moon"), 51 sentence("an apple a day keeps the doctor away"), 52 sentence("four score and seven years ago"), 53 sentence("snow white and the seven dwarfs"), 54 sentence("i am at two with nature")}; 55 final String sentence = sentences[_rand.nextInt(sentences.length)]; 56 57 LOG.debug("Emitting tuple: {}", sentence); 58 59 _collector.emit(new Values(sentence)); 60 61 System.out.println("***" + sentence); 62 } 63 64 protected String sentence(String input) { 65 return input; 66 } 67 68 @Override 69 public void ack(Object id) { 70 } 71 72 @Override 73 public void fail(Object id) { 74 } 75 76 @Override 77 public void declareOutputFields(OutputFieldsDeclarer declarer) { 78 declarer.declare(new Fields("sentence")); 79 } 80 } 81 82 83 // 定義個Bolt,用於將句子切分為單詞 84 public static class SplitSentence extends BaseBasicBolt{ 85 @Override 86 public void execute(Tuple tuple, BasicOutputCollector collector){ 87 // 接收到一個句子 88 String sentence = tuple.getString(0); 89 // 把句子切割為單詞 90 StringTokenizer iter = new StringTokenizer(sentence); 91 // 發送每一個單詞 92 while(iter.hasMoreElements()){ 93 collector.emit(new Values(iter.nextToken())); 94 } 95 } 96 97 @Override 98 public void declareOutputFields(OutputFieldsDeclarer declarer){ 99 // 定義一個字段 100 declarer.declare(new Fields("word")); 101 } 102 103 @Override 104 public Map<String, Object> getComponentConfiguration() { 105 return null; 106 } 107 } 108 109 // 定義一個Bolt,用於單詞計數 110 public static class WordCount extends BaseBasicBolt { 111 Map<String, Integer> counts = new HashMap<String, Integer>(); 112 113 @Override 114 public void execute(Tuple tuple, BasicOutputCollector collector){ 115 String word = tuple.getString(0); 116 Integer count = counts.get(word); 117 if (count == null) 118 count = 0; 119 count++; 120 counts.put(word, count); 121 122 System.out.println(word +" "+count); 123 } 124 125 @Override 126 public void declareOutputFields(OutputFieldsDeclarer declarer){ 127 // 定義兩個字段word和count 128 declarer.declare(new Fields("word","count")); 129 } 130 } 131 public static void main(String[] args) throws Exception 132 { 133 System.out.println("main"); 134 // 創建一個拓撲 135 TopologyBuilder builder = new TopologyBuilder(); 136 // 設置Spout,這個Spout的名字叫做"Spout",設置並行度為5 137 builder.setSpout("Spout", new RandomSentenceSpout(), 5); 138 // 設置slot——“split”,並行度為8,它的數據來源是spout的 139 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout"); 140 // 設置slot——“count”,你並行度為12,它的數據來源是split的word字段 141 builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word")); 142 143 Config conf = new Config(); 144 145 // 本地集群 146 LocalCluster cluster = new LocalCluster(); 147 148 System.out.println("LocalCluster"); 149 150 // 提交拓撲(該拓撲的名字叫word-count) 151 cluster.submitTopology("word-count", conf, builder.createTopology() ); 152 153 System.out.println("submitTopology"); 154 155 Utils.waitForSeconds(10); 156 cluster.killTopology("word-count"); 157 cluster.shutdown(); 158 } 159 } 160 161 public static class Utils { 162 163 public static void waitForSeconds(int seconds) { 164 try { 165 Thread.sleep(seconds * 1000); 166 } catch (InterruptedException e) { 167 } 168 } 169 170 public static void waitForMillis(long milliseconds) { 171 try { 172 Thread.sleep(milliseconds); 173 } catch (InterruptedException e) { 174 } 175 } 176 } 177 }View Code
請使用手機"掃一掃"x
Storm學習筆記(1)Hello WordCount - 單機模式