Jstorm原始碼分析--kill、rebanlance、activate、deactivate方法流程
阿新 • • 發佈:2019-01-28
這幾個kill、rebanlance、activate、deactivate方法放在一起分析,是因為他們都依賴於nimbus定義的通用的 狀態轉移方法:
transitionName
********************************************這裡只具體分析kill方法*********************************
二:LocalCluster完成構造之後,即可使用killTopology/ killTopologyWithOpts殺死拓撲。
方法的實現都會呼叫nimbus的對應方法。追蹤nimbus的實現類,就能看到killTopology/ killTopologyWithOpts這兩個方法,真正的實現類就是killTopologyWithOpts
@Override
public void killTopology(String name) throws NotAliveException, TException {
killTopologyWithOpts(name, new KillOptions());
}
跟蹤killTopologyWithOpts方法,方法定義如下:
public void killTopologyWithOpts(String topologyName, KillOptions options)
可以看到需要兩個引數,分別是拓撲的名稱和kill的操作。
@Override public void killTopologyWithOpts(String topologyName, KillOptions options) throws NotAliveException, TException { try { checkTopologyActive(data, topologyName, true); Integer wait_amt = null; //檢視是否設定了殺死拓撲的等待時間,如果設定了就等待 if (options.is_set_wait_secs()) { wait_amt = options.get_wait_secs(); } // NimbusUtils.transitionName(data, topologyName, true, StatusType.kill, wait_amt); } catch (NotAliveException e) { String errMsg = "KillTopology Error, no this topology " + topologyName; LOG.error(errMsg, e); throw new NotAliveException(errMsg); } catch (Exception e) { String errMsg = "Failed to kill topology " + topologyName; LOG.error(errMsg, e); throw new TException(errMsg); } }
流程:1,先檢視拓撲是否Active,
public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception { if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) { if (bActive) { throw new NotAliveException(topologyName + " is not alive"); } else { throw new AlreadyAliveException(topologyName + " is already active"); } } }
跟下去
public boolean isTopologyActive(StormClusterState stormClusterState,
String topologyName) throws Exception {
boolean rtn = false;
if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
rtn = true;
}
return rtn;
}
2:檢視是否設定了kill操作是否設定了等待時間,如果設定了就等待後再執行
3:transitionName:這是在nimbus定義的通用的 狀態轉移方法,,他們會根據傳入的轉移事件做相應的處理。
public static <T> void transitionName(NimbusData data, String topologyName,
boolean errorOnNoTransition, StatusType transition_status,
T... args) throws Exception {
//獲得nimbus的資訊,通過getStormClusterState查詢對應的storm-id(實際上就是將storm-name 轉換為storm-id的狀態),如果找到了,就呼叫transition方法。
StormClusterState stormClusterState = data.getStormClusterState();
String topologyId = Cluster.get_topology_id(stormClusterState,
topologyName);
if (topologyId == null) {
throw new NotAliveException(topologyName);
}
//開始執行kill操作,轉變拓撲狀態。
transition(data, topologyId, errorOnNoTransition, transition_status,
args);
}
</pre><p><span style="white-space:pre"></span></p><pre name="code" class="java">public static <T> void transitionName(NimbusData data, String topologyName,
boolean errorOnNoTransition, StatusType transition_status,
T... args) throws Exception {
//通過nimbus的資訊,獲得storm客戶端的狀態。
StormClusterState stormClusterState = data.getStormClusterState();
//通過cluster的狀態和拓撲的名稱獲得拓撲的ID。
String topologyId = Cluster.get_topology_id(stormClusterState,
topologyName);
if (topologyId == null) {
throw new NotAliveException(topologyName);
}
//開始執行kill操作,轉變拓撲狀態
transition(data, topologyId, errorOnNoTransition, transition_status,
args);
}
4:transition:這是在nimbus定義的通用的 狀態轉移方法
transition方法會根據傳入的轉移事件,獲取與當前Topology狀態對應的狀態轉移函式,並且執行該函式取得轉換後的新狀態。
public static <T> void transition(NimbusData data, String topologyid,
boolean errorOnNoTransition, StatusType transition_status,
T... args) {
try {
//轉變拓撲的狀態
data.getStatusTransition().transition(topologyid,
errorOnNoTransition, transition_status, args);
} catch (Exception e) {
// TODO Auto-generated catch block
LOG.error("Failed to do status transition,", e);
}
}
*********
最後一步追蹤trantion有一步topologyLocks還沒看太明白%>_<%
後續更新