1. 程式人生 > 程式設計 >Storm 系列(二)—— Storm 核心概念詳解

Storm 系列(二)—— Storm 核心概念詳解

一、Storm核心概念

https://github.com/heibaiying

1.1 Topologies(拓撲)

一個完整的 Storm 流處理程式被稱為 Storm topology(拓撲)。它是一個是由 SpoutsBolts 通過 Stream 連線起來的有向無環圖,Storm 會保持每個提交到叢集的 topology 持續地執行,從而處理源源不斷的資料流,直到你將主動其殺死 (kill) 為止。

1.2 Streams(流)

Stream 是 Storm 中的核心概念。一個 Stream 是一個無界的、以分散式方式並行建立和處理的 Tuple 序列。Tuple 可以包含大多數基本型別以及自定義型別的資料。簡單來說,Tuple 就是流資料的實際載體,而 Stream 就是一系列 Tuple。

1.3 Spouts

Spouts 是流資料的源頭,一個 Spout 可以向不止一個 Streams 中傳送資料。Spout 通常分為可靠不可靠兩種:可靠的 Spout 能夠在失敗時重新傳送 Tuple,不可靠的 Spout 一旦把 Tuple 傳送出去就置之不理了。

1.4 Bolts

Bolts 是流資料的處理單元,它可以從一個或者多個 Streams 中接收資料,處理完成後再發射到新的 Streams 中。Bolts 可以執行過濾 (filtering),聚合 (aggregations),連線 (joins) 等操作,並能與檔案系統或資料庫進行互動。

1.5 Stream groupings(分組策略)

https://github.com/heibaiying

spoutsbolts 在叢集上執行任務時,是由多個 Task 並行執行 (如上圖,每一個圓圈代表一個 Task)。當一個 Tuple 需要從 Bolt A 傳送給 Bolt B 執行的時候,程式如何知道應該傳送給 Bolt B 的哪一個 Task 執行呢?

這是由 Stream groupings 分組策略來決定的,Storm 中一共有如下 8 個內建的 Stream Grouping。當然你也可以通過實現 CustomStreamGrouping 介面來實現自定義 Stream 分組策略。

  1. Shuffle grouping

    Tuples 隨機的分發到每個 Bolt 的每個 Task 上,每個 Bolt 獲取到等量的 Tuples。

  2. Fields grouping

    Streams 通過 grouping 指定的欄位 (field) 來分組。假設通過 user-id 欄位進行分割槽,那麼具有相同 user-id 的 Tuples 就會傳送到同一個 Task。

  3. Partial Key grouping

    Streams 通過 grouping 中指定的欄位 (field) 來分組,與 Fields Grouping 相似。但是對於兩個下游的 Bolt 來說是負載均衡的,可以在輸入資料不平均的情況下提供更好的優化。

  4. All grouping

    Streams 會被所有的 Bolt 的 Tasks 進行復制。由於存在資料重複處理,所以需要謹慎使用。

  5. Global grouping

    整個 Streams 會進入 Bolt 的其中一個 Task,通常會進入 id 最小的 Task。

  6. None grouping

    當前 None grouping 和 Shuffle grouping 等價,都是進行隨機分發。

  7. Direct grouping

    Direct grouping 只能被用於 direct streams 。使用這種方式需要由 Tuple 的生產者直接指定由哪個 Task 進行處理。

  8. Local or shuffle grouping

    如果目標 Bolt 有 Tasks 和當前 Bolt 的 Tasks 處在同一個 Worker 程式中,那麼則優先將 Tuple Shuffled 到處於同一個程式的目標 Bolt 的 Tasks 上,這樣可以最大限度地減少網路傳輸。否則,就和普通的 Shuffle Grouping 行為一致。

二、Storm架構詳解

https://github.com/heibaiying

2.1 Nimbus程式

也叫做 Master Node,是 Storm 叢集工作的全域性指揮官。主要功能如下:

  1. 通過 Thrift 介面,監聽並接收 Client 提交的 Topology;
  2. 根據叢集 Workers 的資源情況,將 Client 提交的 Topology 進行任務分配,分配結果寫入 Zookeeper;
  3. 通過 Thrift 介面,監聽 Supervisor 的下載 Topology 程式碼的請求,並提供下載 ;
  4. 通過 Thrift 介面,監聽 UI 對統計資訊的讀取,從 Zookeeper 上讀取統計資訊,返回給 UI;
  5. 若程式退出後,立即在本機重啟,則不影響叢集執行。

2.2 Supervisor程式

也叫做 Worker Node,是 Storm 叢集的資源管理者,按需啟動 Worker 程式。主要功能如下:

  1. 定時從 Zookeeper 檢查是否有新 Topology 程式碼未下載到本地 ,並定時刪除舊 Topology 程式碼 ;
  2. 根據 Nimbus 的任務分配計劃,在本機按需啟動 1 個或多個 Worker 程式,並監控所有的 Worker 程式的情況;
  3. 若程式退出,立即在本機重啟,則不影響叢集執行。

2.3 zookeeper的作用

Nimbus 和 Supervisor 程式都被設計為快速失敗(遇到任何意外情況時程式自毀)和無狀態(所有狀態儲存在 Zookeeper 或磁碟上)。 這樣設計的好處就是如果它們的程式被意外銷燬,那麼在重新啟動後,就只需要從 Zookeeper 上獲取之前的狀態資料即可,並不會造成任何資料丟失。

2.4 Worker程式

Storm 叢集的任務構造者 ,構造 Spoult 或 Bolt 的 Task 例項,啟動 Executor 執行緒。主要功能如下:

  1. 根據 Zookeeper 上分配的 Task,在本程式中啟動 1 個或多個 Executor 執行緒,將構造好的 Task 例項交給 Executor 去執行;
  2. 向 Zookeeper 寫入心跳 ;
  3. 維持傳輸佇列,傳送 Tuple 到其他的 Worker ;
  4. 若程式退出,立即在本機重啟,則不影響叢集執行。

2.5 Executor執行緒

Storm 叢集的任務執行者 ,迴圈執行 Task 程式碼。主要功能如下:

  1. 執行 1 個或多個 Task;
  2. 執行 Acker 機制,負責傳送 Task 處理狀態給對應 Spout 所在的 worker。

2.6 並行度

https://github.com/heibaiying

1 個 Worker 程式執行的是 1 個 Topology 的子集,不會出現 1 個 Worker 為多個 Topology 服務的情況,因此 1 個執行中的 Topology 就是由叢集中多臺物理機上的多個 Worker 程式組成的。1 個 Worker 程式會啟動 1 個或多個 Executor 執行緒來執行 1 個 Topology 的 Component(元件,即 Spout 或 Bolt)。

Executor 是 1 個被 Worker 程式啟動的單獨執行緒。每個 Executor 會執行 1 個 Component 中的一個或者多個 Task。

Task 是組成 Component 的程式碼單元。Topology 啟動後,1 個 Component 的 Task 數目是固定不變的,但該 Component 使用的 Executor 執行緒數可以動態調整(例如:1 個 Executor 執行緒可以執行該 Component 的 1 個或多個 Task 例項)。這意味著,對於 1 個 Component 來說,#threads<=#tasks(執行緒數小於等於 Task 數目)這樣的情況是存在的。預設情況下 Task 的數目等於 Executor 執行緒數,即 1 個 Executor 執行緒只執行 1 個 Task。

總結如下:

  • 一個執行中的 Topology 由叢集中的多個 Worker 程式組成的;
  • 在預設情況下,每個 Worker 程式預設啟動一個 Executor 執行緒;
  • 在預設情況下,每個 Executor 預設啟動一個 Task 執行緒;
  • Task 是組成 Component 的程式碼單元。

參考資料

  1. storm documentation -> Concepts

  2. Internal Working of Apache Storm

  3. Understanding the Parallelism of a Storm Topology

  4. Storm nimbus 單節點宕機的處理

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南