Storm/JStorm之TopologyBuilder原始碼閱讀
阿新 • • 發佈:2019-01-23
在Strom/JStorm中有一個類是特別重要的,主要用來構建Topology的,這個類就是TopologyBuilder.
咱先看一下簡單的例子:
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("input", new RandomSentenceSpout(), 2 );
builder.setBolt("bolt_sentence", new SplitSentenceBolt(), 2)
.shuffleGrouping("input");
// 本地模式:最主要用來除錯用
LocalCluster cluster = new LocalCluster();
System.out.println("start wordcount");
cluster.submitTopology("word count", conf, builder.createTopology());
}
在上面的main方法裡先建立TopologyBuilder物件,然後設定好已建立的Spout節點和Bolt節點,並用隨機分組(shuffleGrouping)將Spout和Bolt節點連線起來形成Topology。那TopologyBuilder是如何做的呢?請看下面TopologyBuilder原始碼:
/**
* TopologyBuilder是一個用於構建Topology的工具類
*
*/
public class TopologyBuilder {
/**
* 定義了類成員變數_bolts,用來存放IRichBolt型別的所有Bolt物件
*/
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
/**
* 定義了類成員變數_spouts,用來存放IRichSpout型別的所有Spout物件
*/
private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
/**
* 定義了類成員變數_commons,存放了所有的Bolt和Spout物件
*/
private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
/**
* 根據傳入的Bolt和Spout物件構建StormTopology物件
* @return
*/
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
for (String boltId : _bolts.keySet()) {
//根據boltId從_bolts中獲取到對應的bolt物件
IRichBolt bolt = _bolts.get(boltId);
//設定對應ComponentCommon物件的streams(輸出的欄位列表以及是否是直接流)屬性值
ComponentCommon common = getComponentCommon(boltId, bolt);
/**
* 先將Bolts物件序列化得到陣列,再建立Bolt物件,所以所有在StormTopology中Bolts是物件序列化過後得到的位元組陣列.
*/
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}
for (String spoutId : _spouts.keySet()) {
//根據spoutId從_spouts中獲取到對應的spout物件
IRichSpout spout = _spouts.get(spoutId);
//設定對應ComponentCommon物件的streams(輸出的欄位列表以及是否是直接流)
ComponentCommon common = getComponentCommon(spoutId, spout);
/**
* 先將Spout物件序列化得到陣列,再建立SpoutSpec物件,所以所有在StormTopology中Spouts是物件序列化過後得到的位元組陣列.
*/
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
}
//將上述所設定的所有元件都封裝到StormTopology物件中,最後提交到叢集中執行
return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
}
/**
* 下面幾個方法定義了setBolt方法以及它的過載方法
*/
/**
* 在這個topology中定義一個只有單執行緒並行度的新的bolt
* 其它想要消耗這個bolt的輸出的元件會引用這個id
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt) {
return setBolt(id, bolt, null);
}
/**
* 為這個topology定義一個指定數量的並行度的bolt
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
//檢測傳入的元件id是否唯一
validateUnusedId(id);
//生成common物件
initCommon(id, bolt, parallelism_hint);
_bolts.put(id, bolt);
return new BoltGetter(id);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
return setBolt(id, bolt, null);
}
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
/**
* 該方法利用BasicBoltExecutor包裝(封裝)傳入的IBasicBolt物件
* 在BasicBoltExecutor中實現了對訊息的追蹤
*/
return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
/**
* 下面幾個方法定義了setSpout方法以及它的過載方法
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout) {
return setSpout(id, spout, null);
}
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
//檢測輸入的id是否唯一,若已經存在將丟擲異常
validateUnusedId(id);
/**
* 構建ComponentCommon物件並進行相對應的初始化,最後放入到_commons(在上述中已經定義)
*/
initCommon(id, spout, parallelism_hint);
_spouts.put(id, spout);
return new SpoutGetter(id);
}
public SpoutDeclarer setSpout(String id, IControlSpout spout) {
return setSpout(id, spout, null);
}
public SpoutDeclarer setSpout(String id, IControlSpout spout, Number parallelism_hint) {
return setSpout(id, new ControlSpoutExecutor(spout), parallelism_hint);
}
public BoltDeclarer setBolt(String id, IControlBolt bolt, Number parallelism_hint) {
return setBolt(id, new ControlBoltExecutor(bolt), parallelism_hint);
}
public BoltDeclarer setBolt(String id, IControlBolt bolt) {
return setBolt(id, bolt, null);
}
public void setStateSpout(String id, IRichStateSpout stateSpout) {
setStateSpout(id, stateSpout, null);
}
public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallelism_hint) {
validateUnusedId(id);
// TODO: finish
}
/**
* 檢測輸入的id是否唯一
* @param id
*/
private void validateUnusedId(String id) {
if (_bolts.containsKey(id)) {
throw new IllegalArgumentException("Bolt has already been declared for id " + id);
}
if (_spouts.containsKey(id)) {
throw new IllegalArgumentException("Spout has already been declared for id " + id);
}
if (_stateSpouts.containsKey(id)) {
throw new IllegalArgumentException("State spout has already been declared for id " + id);
}
}
private ComponentCommon getComponentCommon(String id, IComponent component) {
ComponentCommon ret = new ComponentCommon(_commons.get(id));
OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
return ret;
}
/**
* 定義了initCommon方法,用來初始化變數CommonentCommon物件,並給類成員變數_commons賦值
* 初始化所做的工作:設定並行度還有一些其它配置
* @param id
* @param component
* @param parallelism
*/
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
//設定訊息流的來源及分組方式
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null) {
//設定並行度
common.set_parallelism_hint(parallelism.intValue());
} else {
//如果並行度沒有手動設定則預設為1
common.set_parallelism_hint(1);
}
Map conf = component.getComponentConfiguration();
if (conf != null)
//設定元件的配置引數
common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
}
從上面TopologyBuilder的類中可以看到這個類提供了建立StormTopology的方法以及一些資料來源節點和處理節點的相關設定的方法,還有就是儲存Bolt物件和Spout物件的方法,當然這裡關於分組的程式碼沒有寫出來。事實上這個類就是用來設定Spout節點和Bolt節點,並通過分組方式將Spout和Bolt節點連線起來形成拓撲結構的。