1. 程式人生 > >【Storm初探】wordcount demo

【Storm初探】wordcount demo

目前工作下需要接觸storm,遂寫個demo練練手。
程式碼主要借鑑的是storm官方的github專案:https://github.com/apache/storm
略做修改。
1.pom.xml引入

   <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <type>jar</type>
        <version>1.0.0</version
>
</dependency>

2.Spout
Spout是strom裡產生元資料(tuple)流的類,通常情況下Spout會讀取外部資料,然後轉換為Topology內部的元資料(tuple),主動角色。
nextTuple方法不斷執行,資料在此方法內產生。
此處用在陣列中隨機獲得句子作為元資料流做示例。

public class RandomSentenceSpout extends BaseRichSpout {
  private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);

  SpoutOutputCollector _collector;
  Random _rand;

  @Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"
), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.info("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }

3.Bolt
在一個Topology中接收資料並處理的元件,被動角色。
execute函式處理接收到的資料,並生成新元組轉發(emit)出去。
此處定義了兩個bolt,第一個blot先把得到Sentence分隔成word,然後下一個bolt對word計數。

public class SplitSentenceBlot extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        String[] words = word.split(" ");
        for (String s : words) {
            System.out.println("==========" + s);
            collector.emit(new Values(s));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
public class WordCountBlot extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        counts.put(word, count);
        System.out.println("==============[" + word + "]:" + count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

4.Topology
拓撲,storm裡的關鍵性概念,本質上就是在storm裡執行的一個實時應用程式,也就是此處要定義的主類。
具體看註釋即可…

public class SimpleTopology {
    public static void main(String[] args) throws Exception{
        // 例項化TopologyBuilder類。
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        // 設定噴發節點並分配併發數,該併發數將會控制該物件在叢集中的執行緒數。
        topologyBuilder.setSpout("spout", new RandomSentenceSpout(), 1);
        // 設定資料處理節點並分配併發數。指定該節點接收噴發節點的策略為隨機方式。
        topologyBuilder.setBolt("split", new SplitSentenceBlot(), 3).shuffleGrouping("spout");
        topologyBuilder.setBolt("count", new WordCountBlot(), 3).fieldsGrouping("split", new Fields("word"));
        String topologyName = "word-count";

        Config config = new Config();
//        config.setDebug(true);
        if (args != null && args.length > 0) {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
        } else {
            // 這裡是本地模式下執行的啟動程式碼。
            config.setMaxTaskParallelism(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
        }
    }
}