1. 程式人生 > >Storm學習筆記(1)Hello WordCount - 單機模式

Storm學習筆記(1)Hello WordCount - 單機模式

down sys 集群 tokenizer calc com form creat bolt

古人雲,紙上得來終覺淺,絕知此事要躬行。翻譯過來,就是學東西哪有不踩坑的。

因為工作原因要折騰Storm,環境和第一個例子折騰了好久,搞完了回頭看,吐血的簡單。

Storm有兩種模式,單機和集群。入門當然選單機。

1、安裝JDK,配置Eclipse環境

2、建立一個Maven工程,在pom.xml加上這段

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.2</version>


<scope>compile</scope>
</dependency>

3、通過Maven建立項目和下載依賴包。

其實,所需要的storm-core-1.1.2.jar可以從官網下載的storm包裏面的lib目錄中找到。

Java在下不熟悉,也就不多說了。

4、參考官方或者各種教程的word-count例子編個代碼。

5、在Eclipse裏面run起來就可以了。

什麽Storm, Zookeeper,其實在這個單機入門例子裏面,都是不需要的!

就這麽簡單。

具體代碼來說,官方提供的storm-starter例子中,WordCountTopology.java挺適合入門的。

只是裏面有個坑:

官方采用了python作為句子分割blot的實現,但是如果環境不具備的話,一跑就會出錯。

就是這段:

public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
super("python", "splitsentence.py");
}

// 其余部分略

可以用這個類來替代:

技術分享圖片
 1 public static class SplitSentence extends BaseBasicBolt{  
 2      @Override  
3 public void execute(Tuple tuple, BasicOutputCollector collector){ 4 // 接收到一個句子 5 String sentence = tuple.getString(0); 6 // 把句子切割為單詞 7 StringTokenizer iter = new StringTokenizer(sentence); 8 // 發送每一個單詞 9 while(iter.hasMoreElements()){ 10 collector.emit(new Values(iter.nextToken())); 11 } 12 } 13 14 @Override 15 public void declareOutputFields(OutputFieldsDeclarer declarer){ 16 // 定義一個字段 17 declarer.declare(new Fields("word")); 18 } 19 20 @Override 21 public Map<String, Object> getComponentConfiguration() { 22 return null; 23 } 24 }
View Code

Run起來以後,在Eclipse的Console窗口裏面可以看到運行的詳情。

完整代碼如下:

技術分享圖片
  1 package storm.blueprints;
  2 
  3 import org.apache.storm.spout.SpoutOutputCollector;
  4 import org.apache.storm.task.TopologyContext;
  5 import org.apache.storm.topology.OutputFieldsDeclarer;
  6 import org.apache.storm.topology.base.BaseRichSpout;
  7 import org.apache.storm.tuple.Fields;
  8 import org.apache.storm.tuple.Values;
  9 
 10 import org.apache.storm.utils.Utils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 import org.apache.storm.Config;  
 15 import org.apache.storm.LocalCluster;  
 16 import org.apache.storm.StormSubmitter;  
 17 import org.apache.storm.task.ShellBolt;  
 18    
 19 import org.apache.storm.topology.BasicOutputCollector;  
 20 import org.apache.storm.topology.IRichBolt;  
 21 import org.apache.storm.topology.TopologyBuilder;  
 22 import org.apache.storm.topology.base.BaseBasicBolt;  
 23    
 24 import org.apache.storm.tuple.Tuple;  
 25 import java.util.HashMap;
 26 import java.util.Map;
 27 
 28 
 29 import java.util.*;
 30 
 31 public class HelloWordCount 
 32 {
 33      public static class RandomSentenceSpout extends BaseRichSpout {
 34            private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
 35 
 36           SpoutOutputCollector _collector;
 37            Random _rand;
 38 
 39 
 40            @Override
 41            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 42              _collector = collector;
 43              _rand = new Random();
 44            }
 45 
 46           @Override
 47            public void nextTuple() {
 48              Utils.waitForMillis(100);//(100);
 49              String[] sentences = new String[]{
 50                      sentence("the cow jumped over the moon"),
 51                      sentence("an apple a day keeps the doctor away"),
 52                      sentence("four score and seven years ago"),
 53                      sentence("snow white and the seven dwarfs"),
 54                      sentence("i am at two with nature")};
 55              final String sentence = sentences[_rand.nextInt(sentences.length)];
 56 
 57             LOG.debug("Emitting tuple: {}", sentence);
 58 
 59             _collector.emit(new Values(sentence));
 60              
 61              System.out.println("***" + sentence);
 62            }
 63 
 64           protected String sentence(String input) {
 65              return input;
 66            }
 67 
 68           @Override
 69            public void ack(Object id) {
 70            }
 71 
 72           @Override
 73            public void fail(Object id) {
 74            }
 75 
 76           @Override
 77            public void declareOutputFields(OutputFieldsDeclarer declarer) {
 78              declarer.declare(new Fields("sentence"));
 79            }
 80      }
 81      
 82        
 83      // 定義個Bolt,用於將句子切分為單詞  
 84      public static class SplitSentence extends BaseBasicBolt{  
 85          @Override  
 86          public void execute(Tuple tuple, BasicOutputCollector collector){  
 87              // 接收到一個句子  
 88              String sentence = tuple.getString(0);  
 89              // 把句子切割為單詞  
 90              StringTokenizer iter = new StringTokenizer(sentence);  
 91              // 發送每一個單詞  
 92              while(iter.hasMoreElements()){  
 93                  collector.emit(new Values(iter.nextToken()));  
 94              }  
 95          }  
 96            
 97          @Override  
 98          public void declareOutputFields(OutputFieldsDeclarer declarer){  
 99              // 定義一個字段  
100              declarer.declare(new Fields("word"));  
101          }  
102          
103          @Override
104          public Map<String, Object> getComponentConfiguration() {
105            return null;
106          }
107      }  
108        
109      // 定義一個Bolt,用於單詞計數  
110      public static class WordCount extends BaseBasicBolt {  
111          Map<String, Integer> counts = new HashMap<String, Integer>();  
112            
113          @Override  
114          public void execute(Tuple tuple, BasicOutputCollector collector){  
115              String word = tuple.getString(0);
116              Integer count = counts.get(word);
117              if (count == null)
118                count = 0;
119              count++;
120              counts.put(word, count);
121              
122              System.out.println(word +"  "+count);
123          }  
124            
125          @Override  
126          public void declareOutputFields(OutputFieldsDeclarer declarer){  
127              // 定義兩個字段word和count  
128              declarer.declare(new Fields("word","count"));  
129          }  
130      }  
131      public static void main(String[] args) throws Exception   
132      {  
133          System.out.println("main");
134          // 創建一個拓撲  
135          TopologyBuilder builder = new TopologyBuilder();  
136          // 設置Spout,這個Spout的名字叫做"Spout",設置並行度為5  
137          builder.setSpout("Spout", new RandomSentenceSpout(), 5);  
138          // 設置slot——“split”,並行度為8,它的數據來源是spout的  
139          builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout");  
140          // 設置slot——“count”,你並行度為12,它的數據來源是split的word字段  
141          builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word"));  
142            
143          Config conf = new Config();  
144                
145              // 本地集群  
146              LocalCluster cluster = new LocalCluster();  
147                
148              System.out.println("LocalCluster");
149              
150              // 提交拓撲(該拓撲的名字叫word-count)  
151              cluster.submitTopology("word-count", conf, builder.createTopology() );  
152                
153              System.out.println("submitTopology");           
154              
155              Utils.waitForSeconds(10);
156              cluster.killTopology("word-count");
157              cluster.shutdown();
158              }  
159      }  
160      
161      public static class Utils {
162 
163         public static void waitForSeconds(int seconds) {
164              try {
165                  Thread.sleep(seconds * 1000);
166              } catch (InterruptedException e) {
167              }
168          }
169 
170         public static void waitForMillis(long milliseconds) {
171              try {
172                  Thread.sleep(milliseconds);
173              } catch (InterruptedException e) {
174              }
175          }
176      }
177 }
View Code

請使用手機"掃一掃"x

Storm學習筆記(1)Hello WordCount - 單機模式