1. 程式人生 > >Storm原始碼淺析之topology的提交

Storm原始碼淺析之topology的提交

最近一直在讀twitter開源的這個分散式流計算框架——storm的原始碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進行組織,並且只分析我關注的東西,因此稱之為淺析。        

一、介紹
    Storm的開發語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。原始碼統計結果:
180 text files.
     
177 unique files.                                          
       
7 files ignored.

http:
//cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Java                           
1255010241425661
Lisp                            
337322834871
Python                           
77424334675
CSS                              
112451837
ruby                             
2220104
Bourne Shell                     
1006
Javascript                       
21156-------------------------------------------------------------------------------
SUM:                           
1716519319037160-------------------------------------------------------------------------------
    Java程式碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。

二、Topology和Nimbus        
    Topology是storm的核心理念,將spout和bolt組織成一個topology,執行在storm叢集裡,完成實時分析和計算的任務。這裡我主要想介紹下topology部署到storm叢集的大概過程。提交一個topology任務到Storm叢集是通過StormSubmitter.submitTopology方法提交:
StormSubmitter.submitTopology(name, conf, builder.createTopology());     我們將topology打成jar包後,利用bin/storm這個python指令碼,執行如下命令:
bin/storm jar xxxx.jar com.taobao.MyTopology args     將jar包提交給storm叢集。storm指令碼會啟動JVM執行Topology的main方法,執行submitTopology的過程。而submitTopology會將jar檔案上傳到nimbus,上傳是通過socket傳輸。在storm這個python指令碼的jar方法裡可以看到:
def jar(jarfile, klass, *args):                                                                                                                               
   exec_storm_class(                                                                                                                                          
        klass,                                                                                                                                                
        jvmtype
="-client",                                                                                                                                    
        extrajars
=[jarfile, CONF_DIR, STORM_DIR +"/bin"],                                                                                                    
        args
=args,                                                                                                                                            
        prefix
="export STORM_JAR="+ jarfile +";") 
     將jar檔案的地址設定為環境變數STORM_JAR,這個環境變數在執行submitTopology的時候用到:
//StormSubmitter.java privatestaticvoid submitJar(Map conf) {
        
if(submittedJar==null) {
            LOG.info(
"Jar not uploaded to master yet. Submitting jar");
            String localJar 
= System.getenv("STORM_JAR");
            submittedJar 
= submitJar(conf, localJar);
        } 
else {
            LOG.info(
"Jar already uploaded to master. Not submitting jar.");
        }
    }
    通過環境變數找到jar包的地址,然後上傳。利用環境變數傳參是個小技巧。

    其次,nimbus在接收到jar檔案後,存放到資料目錄的inbox目錄,nimbus資料目錄的結構
-nimbus
     
-inbox
         
-stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
         
-stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     
-stormdist
        
-storm-id
           
-stormjar.jar
           
-stormconf.ser
           
-stormcode.ser      其中inbox用於存放提交的jar檔案,每個jar檔案都重新命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology後生成的檔案,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動後的topology的jar檔名命名為storm.jar ,而它的配置經過java序列化後存放在stormconf.ser檔案,而stormcode.ser是將topology本身序列化後存放的檔案。這些檔案在部署的時候,supervisor會從這個目錄下載這些檔案,然後在supervisor本地執行這些程式碼。
    進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
1.在zookeeper上建立/taskheartbeats/{storm id} 路徑,用於任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時重新整理節點的時間戳,然後nimbus會檢測這個時間戳是否超過timeout設定。
2.從topology中獲取bolts,spouts設定的並行數目以及全域性配置的最大並行數,然後產生task id列表,如[1 2 3 4]
3.在zookeeper上建立/tasks/{strom id}/{task id}路徑,並存儲task資訊
4.開始分配任務(內部稱為assignment), 具體步驟:
 (1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
 (2)查詢所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的埠。
 (3)將任務均勻地分配給可用的worker,這裡有兩種情況:
 (a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那麼最終是這樣分配
{1: [host1:port1] 2 : [host2:port1]
         
3 : [host1:port1] 4 : [host2:port1]} ,可以看到任務平均地分配在兩個worker上。
(b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那麼首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
[host1:port1 host2:port1 host1:port2 host2:port2] ,然後分配任務為
{1: host1:port1 , 2 : host2:port2}
(4)記錄啟動時間
(5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的資料裡的active設定為true。
6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那麼會重新跳到第4步做re-assignment。