Storm Topology的生命週期——淺析(一)
作為一名storm的初學者,首先應該瞭解的就是storm是如何部署提交一個topology的。也就是說,當我們運行了命令:”storm jar myjarpath mytopologyclass args”之後,storm又是如何做的呢。我查閱了Apache Storm的官方文件,在此翻譯整理一下,並寫一點自己的理解。
官方連結:Lifecycle of a Storm Topology
[譯]Storm Topology的生命週期
(NOTE:這篇文章基於storm0.7.1,之後的版本有了許多變化,比如tasks 和 executors直接的劃分,原始碼的路徑由之前的src/變為storm-core/src等。)
這篇文章詳細介紹了我們執行storm jar命令之後一個Topology的生命週期:上傳Topology到Nimbus,Supervisor啟動/停止workers,workers和tasks的自我建立。還有Nimbus是如何監控Topology的,當執行kill命令時Topology是如何關閉的。
幾個重要nodes:
1、真正執行的topology與使用者自定義的topology不同,因為其加入了acker bolt 和 Streams。
2、真正的topology由system-topology!方法建立。該方法在Nimbus為topology建立tasks時 和worker 路由訊息時使用。
1、啟動一個Topology
- “storm jar”命令根據特定的引數執行你所提交的class。我們只確定”storm jar”命令通過StormSubmitter方法建立並設定了環境變數。
def jar(jarfile, klass, *args):
"""Syntax: [storm jar topology-jar-path class ...]
Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
args=args,
childopts="-Dstorm.jar=" + jarfile)
- 當你使用StormSubmitter.submitTopology提交Topology時,StormSubmitter做了如下幾件事:
- 首先,StormSubmitter通過Nimbus Thrift介面將jar包上傳到Nimbus(在之前沒有上傳過jar包的情況下)。
- beginFileUPload方法返回Nimbus inbox的一個路徑。(這個路徑應該就是jar包上傳到nimbus的路徑。)
- uploadChunk方法保證上傳速度為每次15KB。
- finishFileUpload方法在上傳結束時執行。
- 其次,StormSubmitter呼叫Nimbus Thrift介面的submitTopology方法。
- Topology的配置使用JSON進行序列化。
- submitTopology使用的Nimbus inbox path 就是jar被上傳到nimbus的位置。
- 首先,StormSubmitter通過Nimbus Thrift介面將jar包上傳到Nimbus(在之前沒有上傳過jar包的情況下)。
//submitter upload the jar
public static String submitJar(Map conf, String localJar) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar);
while(true) {
byte[] toSubmit = is.read();
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
} finally {
client.close();
}
}
- Nimbus接收提交的Topology。code
- Nimbus規範化Topology的配置,其主要目的是確保每個單獨的task都有相同的序列化註冊碼,這對於獲得正確的序列化工作至關重要。code
- Nimbus為topology設定靜態狀態。code
- Jars和配置檔案configs都儲存在Nimbus本地檔案系統上(對於zookeeper來說太大了)。Jars和configs被拷貝到 {nimbus local dir}/stormdist/{topology id}路徑下。
- setup-storm-static方法將 task -> component 的對映寫入ZK。
- setup-heartbeats在ZK上建立一個目錄用於task的心跳反應。
- Nimbus呼叫mk-assignment方法將tasks分配給節點。該方法包含:
- master-code-dir:supervisor呼叫,以從Nimbus下載到Topology的jars/configs。
- task->node+port:首先它是一個Map,從 task id 到 worker(該task應該執行)。一個worker由node/port定義,node指定哪臺節點,port指定哪個程序。
- node->host:一個Map,從 node id 到 hostname。使得worker知道需要通訊的其他worker在哪臺節點上。Node id 用於區分supervisor,因此一個或多個supervisor可以執行在一臺節點上。
- task->start-time-secs:包含了一個Map,從 task id 到 timestamp(該時間戳是Nimbus釋出這個task的時間)。這個被Nimbus用來監控topologies,在tasks第一次被啟動時,它們被指定了較長的超時時間,該超時時間用於心跳檢測,並且可以由 “nimbus.task.launch.secs”定義。
;;mk-assignment方法
(defnk mk-assignments [nimbus storm-id :scratch? false]
(log-debug "Determining assignment for " storm-id)
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
callback (fn [& ignored] (transition! nimbus storm-id :monitor))
node->host (get-node->host storm-cluster-state callback)
existing-assignment (.assignment-info storm-cluster-state storm-id nil)
task->node+port (compute-new-task->node+port conf storm-id existing-assignment
storm-cluster-state callback
(:task-heartbeats-cache nimbus)
scratch?)
all-node->host (merge (:node->host existing-assignment) node->host)
reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)
now-secs (current-time-secs)
start-times (merge (:task->start-time-secs existing-assignment)
(into {}
(for [id reassign-ids]
[id now-secs]
)))
assignment (Assignment.
(master-stormdist-root conf storm-id)
(select-keys all-node->host (map first (vals task->node+port)))
task->node+port
start-times
)
]
;; tasks figure out what tasks to talk to by looking at topology at runtime
;; only log/set when there's been a change to the assignment
(if (= existing-assignment assignment)
(log-debug "Assignment for " storm-id " hasn't changed")
(do
(log-message "Setting new assignment for storm id " storm-id ": " (pr-str assignment))
(.set-assignment! storm-cluster-state storm-id assignment)
))
))
- 一旦Topology被分配好了,它們便處於deactivated模式。start-storm將資料(什麼資料我還沒看)寫到ZK上,從而叢集知道該Topology已經處於active狀態了,便開始從spout連續不斷的發射tuples。
(defn- start-storm [storm-name storm-cluster-state storm-id]
(log-message "Activating " storm-name ": " storm-id)
(.activate-storm! storm-cluster-state
storm-id
(StormBase. storm-name
(current-time-secs)
{:type :active})))
- TODO叢集狀態圖(展示了所有的節點和它所包含的元件)。
- Supervisor在後臺運行了兩個方法:
Worker 通過mk-worker方法啟動。code
Tasks通過mk-task方法啟動。code
2、Topology的監控
- Nimbus在topology的整個生命週期內都對其進行監控。
- 建立一個迴圈執行的計時器執行緒(schedule-recurring)來監控topology。
- Nimbus的行為表示成一個有限狀態機。code
- Topology中的”monitor”事件每隔”nimbus.monitor.freq.secs”呼叫一次,該監控器通過reassign-transition方法呼叫reassign-topology方法。code
- reassign-topology呼叫 mk-assignments方法來(逐步遞增地)更新topology。
- mk-assignments檢測heartbeats 並且在必要時重新部署workers。
- 任何重新部署後的改變都將改變ZK中的狀態資訊,從而觸發supervisor的同步工作,並啟動/停止workers。
(schedule-recurring (:timer nimbus)
0
(conf NIMBUS-MONITOR-FREQ-SECS)
(fn []
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :monitor))
(do-cleanup nimbus)
))
3、(Kill)結束一個Topology
- “strom kill”命令通過Nimbus Thrift介面呼叫方法來kill一個topology。
(defn -main [& args]
(with-command-line args
"Kill a topology"
[[wait w "Override the amount of time to wait after deactivating before killing" nil]
posargs]
(let [name (first posargs)
opts (KillOptions.)]
(if wait (.set_wait_secs opts (Integer/parseInt wait)))
(with-configured-nimbus-connection nimbus
(.killTopologyWithOpts nimbus name opts)
(log-message "Killed topology: " name)
))))
- Nimbus 接收 kill命令。code
- Nimbus將該topology標記為kill transition狀態(個人理解)。code
- kill-transition方法將topology的狀態改為killed,並且呼叫remove事件執行”wait time seconds”。code
- wait time 預設和 topology message timeout一樣,也可以通過storm kill 命令中的引數 -w來修改。
- 該方法使得topology處於未啟用狀態(在wait time 之前),wait time結束後才真正關閉。這使得topology在真正結束之前有一段時間來處理目前正在進行的工作。
在 kill transition過程中改變狀態保證來kill協議是容錯的。在啟動時,如果一個topology的狀態是”killed”,那麼nimbus將會呼叫remove event來清除該topology。code
移除一個Topology時會清除儲存在ZK上的分配和狀態資訊。code
- 一個單獨cleanup的執行緒執行 do-cleanup方法,該方法會清除本地的 heartbeat目錄 和 儲存jars/configs目錄。code