1. 程式人生 > >storm如何分配任務和負載均衡?

storm如何分配任務和負載均衡?

角色 包含 first art olt 組件 viso ons 需要

  這裏做一些補充:
  
  1. worker是一個進程,由supervisor啟動,並只負責處理一個topology,所以不會同時處理多個topology.
  
  2. executor是一個線程,由worker啟動,是運行task的物理容器,其和task是1 -> N關系.
  
  3. component是對spout/bolt/acker的抽象.
  
  4. task也是對spout/bolt/acker的抽象,不過是計算了並行度之後。component和task是1 -> N 的關系.
  
  supervisor會定時從zookeeper獲取topologies、已分配的任務分配信息assignments及各類心跳信息,以此為依據進行任務分配。
  
  在supervisor周期性地進行同步時,會根據新的任務分配來啟動新的worker或者關閉舊的worker,以響應任務分配和負載均衡。
  
  worker通過定期的更新connections信息,來獲知其應該通訊的其它worker。
  
  worker啟動時,會根據其分配到的任務啟動一個或多個executor線程。這些線程僅會處理唯一的topology。
  
  executor線程負責處理多個spouts或者多個bolts的邏輯,這些spouts或者bolts,也稱為tasks。
  
  並行度的計算
  
  相關配置及參數的意義
  
  具體有多少個worker,多少個executor,每個executor負責多少個task,是由配置和指定的parallelism-hint共同決定的,但指定的並行度並不一定等於實際運行中的數目。
  
  1、TOPOLOGY-WORKERS參數指定了某個topology運行時需啟動的worker數目
  
  2、parallelism-hint指定某個component(組件,如spout)的初始executor的數目
  
  3、TOPOLOGY-TASKS是component的tasks數,計算稍微復雜點:
  
  (1) 如果未指定TOPOLOGY-TASKS,此值等於初始executors數.
  
  (2) 如果已指定,和TOPOLOGY-MAX-TASK-PARALLELISM值進行比較,取小的那個作為實際的TOPOLOGY-TASKS.
  
  用代碼來表達就是:
  
  (defn- component-parallelism [storm-conf component]
  
  (let [storm-conf (merge storm-conf (component-conf component))
  
  num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
  
  max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
  
  ]
  
  (if max-parallelism
  
  (min max-parallelism num-tasks)
  
  num-tasks)))
  
  4、對於acker這種特殊的bolt來說,其並行度計算如下:
  
  (1) 如果指定了TOPOLOGY-ACKER-EXECUTORS,按這個值計算.
  
  (2) 如果未指定,那麽按TOPOLOGY-WORKERS的值來設置並行度,這種情況下,一個acker對應一個worker,顯然,在計算任務繁重、數據量比較大的情況下,這是不合適的。
  
  5、如果配置了NIMBUS-SLOTS-PER-TOPOLOGY,在提交topology到nimbus時,會驗證topology所需的worker總數,如果超過了這個值,說明不能夠滿足需求,則拋出異常。
  
  6、如果配置了NIMBUS-EXECUTORS-PER-TOPOLOGY,如第5點,會驗證topology所需的executor總數,如果超出,也會拋出異常。
  
  同時,需要註意,實際運行中,有可能出現並行的TASKS數小於指定的數量。
  
  通過調用nimbus接口的rebalance或者do-rebalance操作,以上並行度可被動態改變。
  
  並行度計算在任務分配中的體現
  
  先回顧下任務分配中的幾個主要角色:
  
  接著看幾段重要的並行度計算代碼:
  
  1、計算所有topology的topology-id到executors的映射關系:
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  ; 計算所有tolopogy的topology-id到executors的映射
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  (defn- compute-topology->executors [nimbus storm-ids]
  
  "compute a topology-id -> executors map"
  
  (into {} (for [tid storm-ids]
  
  {tid (set (compute-executors nimbus tid))})))
  
  2、計算topology-id到executors的映射信息:
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  ; 計算topology-id到executors的映射
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  (defn- compute-executors [nimbus storm-id]
  
  (let [conf (:conf nimbus)
  
  storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
  
  component->executors (:component->executors storm-base)
  
  storm-conf (read-storm-conf conf storm-id)
  
  topology (read-storm-topology conf storm-id)
  
  task->component (storm-task-info topology storm-conf)]
  
  (->> www.thd729.com (storm-task-info topology storm-conf)
  
  reverse-map
  
  (map-val sort)
  
  (join-maps component->executors)
  
  (map-val www.dfgj729.com (partial apply partition-fixed))
  
  (mapcat second)
  
  (map to-executor-id)
  
  )))
  
  3、計算topology的任務信息 task-info,這裏TOPOLOGY-TASKS就決定了每個組件component(spout、bolt)的並行度,或者說tasks數:
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  ; 計算topology的task-info
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  (defn storm-task-info
  
  "Returns map from task -> component id"
  
  [^StormTopology user-topology storm-conf]
  
  (->> (system-topology! storm-conf user-topology)
  
  all-components
  
  ;; 獲取每個組件的並行數
  
  (map-val (comp #(get www.douniu828.com% TOPOLOGY-TASKS) component-conf))
  
  (sort-by first)
  
  (mapcat (fn [[c num-tasks]] (repeat num-tasks c)www.fengshen157.com))
  
  (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
  
  (into {})
  
  ))
  
  4、上述1、2、3段代碼會在nimbus進行任務分配時調用,任務分配是通過mk-assignments函數來完成,調用過程用偽代碼描述如下:
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  ; nimbus進行任務分配
  
  ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  
  mk-assignments
  
  ;; 這一步計算topology的所有executor對應的node + port信息
  
  ->compute-new-topology->executor->node+port
  
  ->compute-topology->executors
  
  -> ...
  
  nimbus進行任務分配
  
  這裏回顧並補充下nimbus進行任務分配的主要流程:
  
  任務分配的流程
  
  1、nimbus將一組node + port 稱為worker-slot,由executor到worker-slot的映射信息,就決定executor將在哪臺機器、哪個worker進程運行,隨之spout、bolt、acker等位置也就確定了,如下圖所示:
  
  2、 nimbus是整個集群的控管核心,總體負責了topology的提交、運行狀態監控、負載均衡及任務分配等工作。
  
  3、nimbus分配的任務包含了topology代碼所在的路徑(在nimbus本地)、tasks、executors及workers信息。worker由node + port及配置的worker數量來唯一確定。
  
  任務信息Assignment結構如下:
  
  4、supervisor負責實際的同步worker的操作。一個supervisor稱為一個node。所謂同步worker,是指響應nimbus的任務分配,進行worker的建立、調度與銷毀。
  
  在收到任務時,如果相關的topology代碼不在本地,supervisor會從nimbus下載代碼並寫入本地文件。
  
  5、 通過node、port、host信息的計算,worker就知道和哪些機器進行通訊,而當負載均衡發生、任務被重新分配時,這些機器可能發生了變化,worker會通過周期性的調用refresh-connections來獲知變化,並進行新連接的建立、廢棄連接的銷毀等工作,如下圖所示:
  
  任務分配的依據
  
  supervisor、worker、executor等組件的心跳信息會同步至zookeeper,nimbus會周期性地獲取這些信息,結合已分配的任務信息assignments、集群現有的topologies(已運行+未運行)等等信息,來進行任務分配,如下圖所示:
  
  任務分配的時機
  
  1、通過rebalance和do-reblalance(比如來自web調用)觸發負載均衡,會觸發mk-assignments即任務分配。
  
  2、同時,nimbus進程啟動後,會周期性地進行任務分配。
  
  3、客戶端通過 storm jar ... topology 方式提交topology,會通過thrift調用nimbus接口,提交topology,啟動新storm實例,並觸發任務分配。
  
  負載均衡
  
  負載均衡和任務分配是連在一起的,或者說任務分配中所用到的關鍵信息是由負載均衡來主導計算的,上文已經分析了任務分配的主要角色和流程,那麽負載均衡理解起來就很容易了,流程和框架如下圖所示:
  
  其中,負載均衡部分的策略可采用平均分配、機器隔離或topology隔離後再分配、Round-Robin等等,因為主要討論storm的基礎框架,而具體的負載均衡策略各家都不一樣,而且這個策略是完全可以自定義的,比如可以將機器的實際能力如CPU、磁盤、內存、網絡等等資源抽象為一個一個的資源slot,以此slot為單位進行分配,等等。
  
  這裏就不深入展開了。
  
  通過負載均衡得出了新的任務分配信息assignments,nimbus再進行一些轉換計算,就會將信息同步到zookeeper上,supervisor就可以根據這些信息來同步worker了。
  
  結語
  
  本篇作為對上篇的補充和完善。
  
  也完整地回答了這個問題:
  
  在Topology中我們可以指定spout、bolt的並行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器並且控制服務的CPU、磁盤等資源的?
  
  終。
  
  相關閱讀
  
  storm的基礎框架分析:http://www.cnblogs.com/foreach-break/p/storm_worker_executor_spout_bolt_simbus_supervisor_mk-assignments.html

storm如何分配任務和負載均衡?