1. 程式人生 > >Storm/JStorm之TopologyBuilder原始碼閱讀

Storm/JStorm之TopologyBuilder原始碼閱讀

在Strom/JStorm中有一個類是特別重要的,主要用來構建Topology的,這個類就是TopologyBuilder.
咱先看一下簡單的例子:

public static void main(String[] args) throws AlreadyAliveException,
            InvalidTopologyException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("input", new RandomSentenceSpout(), 2
); builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2) .shuffleGrouping("input"); // 本地模式:最主要用來除錯用 LocalCluster cluster = new LocalCluster(); System.out.println("start wordcount"); cluster.submitTopology("word count", conf, builder.createTopology()); }

在上面的main方法裡先建立TopologyBuilder物件,然後設定好已建立的Spout節點和Bolt節點,並用隨機分組(shuffleGrouping)將Spout和Bolt節點連線起來形成Topology。那TopologyBuilder是如何做的呢?請看下面TopologyBuilder原始碼:

/**
 * TopologyBuilder是一個用於構建Topology的工具類
 *
 */
public class TopologyBuilder {
    /**
     * 定義了類成員變數_bolts,用來存放IRichBolt型別的所有Bolt物件
     */
    private
Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); /** * 定義了類成員變數_spouts,用來存放IRichSpout型別的所有Spout物件 */ private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); /** * 定義了類成員變數_commons,存放了所有的Bolt和Spout物件 */ private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>(); // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>(); private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>(); /** * 根據傳入的Bolt和Spout物件構建StormTopology物件 * @return */ public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>(); for (String boltId : _bolts.keySet()) { //根據boltId從_bolts中獲取到對應的bolt物件 IRichBolt bolt = _bolts.get(boltId); //設定對應ComponentCommon物件的streams(輸出的欄位列表以及是否是直接流)屬性值 ComponentCommon common = getComponentCommon(boltId, bolt); /** * 先將Bolts物件序列化得到陣列,再建立Bolt物件,所以所有在StormTopology中Bolts是物件序列化過後得到的位元組陣列. */ boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } for (String spoutId : _spouts.keySet()) { //根據spoutId從_spouts中獲取到對應的spout物件 IRichSpout spout = _spouts.get(spoutId); //設定對應ComponentCommon物件的streams(輸出的欄位列表以及是否是直接流) ComponentCommon common = getComponentCommon(spoutId, spout); /** * 先將Spout物件序列化得到陣列,再建立SpoutSpec物件,所以所有在StormTopology中Spouts是物件序列化過後得到的位元組陣列. */ spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); } //將上述所設定的所有元件都封裝到StormTopology物件中,最後提交到叢集中執行 return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>()); } /** * 下面幾個方法定義了setBolt方法以及它的過載方法 */ /** * 在這個topology中定義一個只有單執行緒並行度的新的bolt * 其它想要消耗這個bolt的輸出的元件會引用這個id */ public BoltDeclarer setBolt(String id, IRichBolt bolt) { return setBolt(id, bolt, null); } /** * 為這個topology定義一個指定數量的並行度的bolt */ public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) { //檢測傳入的元件id是否唯一 validateUnusedId(id); //生成common物件 initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); } public BoltDeclarer setBolt(String id, IBasicBolt bolt) { return setBolt(id, bolt, null); } public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) { /** * 該方法利用BasicBoltExecutor包裝(封裝)傳入的IBasicBolt物件 * 在BasicBoltExecutor中實現了對訊息的追蹤 */ return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint); } /** * 下面幾個方法定義了setSpout方法以及它的過載方法 */ public SpoutDeclarer setSpout(String id, IRichSpout spout) { return setSpout(id, spout, null); } public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) { //檢測輸入的id是否唯一,若已經存在將丟擲異常 validateUnusedId(id); /** * 構建ComponentCommon物件並進行相對應的初始化,最後放入到_commons(在上述中已經定義) */ initCommon(id, spout, parallelism_hint); _spouts.put(id, spout); return new SpoutGetter(id); } public SpoutDeclarer setSpout(String id, IControlSpout spout) { return setSpout(id, spout, null); } public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) { return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint); } public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) { return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint); } public BoltDeclarer setBolt(String id, IControlBolt bolt) { return setBolt(id, bolt, null); } public void setStateSpout(String id, IRichStateSpout stateSpout) { setStateSpout(id, stateSpout, null); } public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) { validateUnusedId(id); // TODO: finish } /** * 檢測輸入的id是否唯一 * @param id */ private void validateUnusedId(String id) { if (_bolts.containsKey(id)) { throw new IllegalArgumentException("Bolt has already been declared for id " + id); } if (_spouts.containsKey(id)) { throw new IllegalArgumentException("Spout has already been declared for id " + id); } if (_stateSpouts.containsKey(id)) { throw new IllegalArgumentException("State spout has already been declared for id " + id); } } private ComponentCommon getComponentCommon(String id, IComponent component) { ComponentCommon ret = new ComponentCommon(_commons.get(id)); OutputFieldsGetter getter = new OutputFieldsGetter(); component.declareOutputFields(getter); ret.set_streams(getter.getFieldsDeclaration()); return ret; } /** * 定義了initCommon方法,用來初始化變數CommonentCommon物件,並給類成員變數_commons賦值 * 初始化所做的工作:設定並行度還有一些其它配置 * @param id * @param component * @param parallelism */ private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); //設定訊息流的來源及分組方式 common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if (parallelism != null) { //設定並行度 common.set_parallelism_hint(parallelism.intValue()); } else { //如果並行度沒有手動設定則預設為1 common.set_parallelism_hint(1); } Map conf = component.getComponentConfiguration(); if (conf != null) //設定元件的配置引數 common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); } }

從上面TopologyBuilder的類中可以看到這個類提供了建立StormTopology的方法以及一些資料來源節點和處理節點的相關設定的方法,還有就是儲存Bolt物件和Spout物件的方法,當然這裡關於分組的程式碼沒有寫出來。事實上這個類就是用來設定Spout節點和Bolt節點,並通過分組方式將Spout和Bolt節點連線起來形成拓撲結構的。