Storm初級-概述、併發控制基礎
概述
Storm是一個分散式的實時計算框架,具有可擴充套件,容錯等特性。可以應用於實時計算,線上機器學習等領域。
Strom的處理速度最快可以到達毫秒級別,QPS(Query Per Second) 達到9-10萬,而JStorm QPS達到11-12萬,另外還有Spark Streaming。
優勢:
- 處理速度快:QPS 9-10萬,每個節點每秒可以處理100萬個資料元組
- 細粒度處理:可以逐個tuple處理
應用場景:
語音實時牆:將使用者登入的地點實時顯示在地圖上,資料量為每天一億,每秒峰值20000,要求系統具備高可靠性,某些單點出現問題不能對服務造成影響,資料落地到資料展示的時延在30s內。
網路流量流向實時分析:通過Storm實時分析網路流量流向,並將實時統計反映在前端頁面的圖表中以備查詢。
基於GPS的實時路況分析:基於GPS資料,通過Storm可以做實時路況分析系統。實時路況能實時反映區域內交通路況,指引最佳、最快捷的行駛路線,提高道路和車輛的使用效率。
關聯概念
實時流:可以實時到達,被實時處理的資料流
實時流計算:用於實時動態處理大量、快速、時變的資料流。
實時計算處理流程:
資料實時採集:
- 需求:功能上實時採集完整的資料;響應時間上保證實時性、低延遲;配置簡單,部署容易;系統穩定可靠
- 相關產品(每秒數百MB):Scribe(Facebook)、Kafka(LinkdeIn)、Flume(Cloudera)、TimeTunnel(淘寶)、Chukwa(Hadoop)
資料實時計算:
- 需求:適應流式資料、不間斷查詢、系統穩定可靠、可拓展性好、可維護性好
- 工作模式:系統主動分析處理資料、使用者處於被動接收狀態(離線處理相反)
資料實時查詢:
實時查詢服務分為全記憶體、半記憶體、全磁碟三種:
- 全記憶體:直接提供資料讀取服務,定期轉存到磁碟或資料庫進行持久化。
- 半記憶體:使用Redis、Memcache、MongoDB、BerkeleyDB等記憶體資料庫提供資料實時查詢服務,由這些系統進行持久化操作。
- 全磁碟:使用HBase等以分散式檔案系統(HDFS)為基礎的NoSQL資料庫,對於KeyValue記憶體引擎,關鍵是設計好Key的分佈。
核心元件
Storm結構稱為topology(拓撲),由Stream(資料流),Spout(噴嘴-資料流的生成者),Bolt(閥門-資料流運算者)組成(參考圖:Storm組成結構)。
1 Stream
Stream是由無限制連續的tuple組成的序列,tuple是Storm最核心的資料結構,每個Tuple都是包含了一個或者多個鍵值對的列表。
2 Spout
Spout負責連線資料來源,接收資料,轉換為tuple作為資料流向後傳送,只負責轉化資料,不負責資料處理。
3 Bolt
負責接收資料,執行運算,運算過後可以繼續向後傳送tuple,給其他零個或多個Bolt。也可以理解為計算程式中的運算或者函式,將一個或者多個數據流作為輸入,對資料實施運算後,選擇性地輸出一個或者多個數據流。bolt可以訂閱多個由spout或者其他bolt發射的資料流,這樣就可以建立複雜的資料流轉換網路。
併發控制
訊息流分組
訊息流是Storm中最關鍵的抽象,是一個沒有邊界的Tuple序列,這些Tuple以分散式的方式並行地建立和處理。定義訊息流主要是定義訊息流中的Tuple。每個訊息流在定義時都會分配一個ID,因為單向訊息流很普遍,OutputFieldsDeclarer定義了一些方法可以定義一個流而不用指定其ID。在這種情況下,該流有一個預設的ID。
StreamGrouping訊息流組
定義Topology的其中一步,就是定義每個Bolt接受何種流作為輸入。StreamGrouping(訊息流組)就是用來定義一個流如何分配Tuple到Bolt。Storm包括7種流分組型別。
-
Shuffle Grouping 隨機分組
隨機分發元組到Bolt,並保證每個Bolt獲得相等數量的元組。
-
Fields Grouping 欄位分組
根據指定欄位分割資料流並分組。
-
All Grouping 全複製分組
對於每一個Tuple來說,所有的Bolt都會收到,所有的Tuple被複制到Bolt的所有任務上,需小心使用該分組。
-
Global Grouping 全域性分組
全部的流都分配到唯一的一個Bolt上,就是分配給ID最小的Task。
-
None Grouping 不分組
不分組的含義是,流不關心到底誰會收到它的Tuple。
-
CustomStreamGrouping 自定義分組
寫一個類實現CustomStreamGrouping介面,其中要實現2個方法:
- prepare 執行時呼叫,用來初始化分組資訊
- chooseTasks 核心方法,用來進行task的選擇
public MyGrouping implements CustomStreamGrouping{ private List<Integer> tasks; @Override /** * 執行時呼叫,用來初始化分組資訊 * context:topology上下文物件 * stream:待分組資料流屬性 * targetTasks:所有待選task的識別符號列表 */ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks){ this.tasks = targetTasks; } @Override /** * 核心方法,進行task選擇 * taskId:傳送tuple的元件id * values:tuple的值 * 返回值要發往哪個task */ public List<Integer> chooseTasks(int taskId, List<Object> values){ return Arrays.asList(tasks.get(0));//所有資訊發去第一個task中 } }
併發元件
1 Nodes 伺服器
Storm叢集中的一個伺服器,會執行Topology的一部分運算,一個Storm叢集中包含一個或者多個Node,由物理機器確定,不能用邏輯調控
2 Workers JVM程序
3 Executor 執行執行緒
4 Task bolt或spout例項的物件
叢集搭建
叢集工作元件
每一個工作節點上執行的Supervisor監聽分配給它那臺機器的工作,根據需要啟動/關閉工作程序,每一個工作程序執行一個Topology的一個子集;一個執行的Topology由執行在很多機器上的很多工作程序Worker組成。那麼Storm的核心就是 主節點(Nimbus)、工作節點(Supervisor)、協調器(ZooKeeper)、工作程序(Worker)、任務執行緒(Task)。
1 Nimbus 主節點
-
主要任務
管理,協調和監控在叢集上執行的topology,負責topology的初始化、任務分發和程序監控。包括topology的釋出,任務指派,事件處理失敗時重新指派任務。
-
工作流程
-
分發jar包:
將topology釋出到Storm叢集,將預先打包成jar檔案的topology和配置資訊提交(submitting)到nimbus伺服器上。一旦nimbus接收到了topology的壓縮包,會將jar包分發到足夠數量的supervisor節點上。
-
指派任務:
當supervisor節點接收到了topology壓縮檔案,nimbus就會指派task(bolt和spout例項)到每個supervisor並且傳送訊號指示supervisoer生成足夠的worker來執行指派的task。
-
-
故障處理
-
檢測supervisor節點活躍度
記錄所有的supervisor節點的狀態和分配給他們的task,如果nimbus發現某個supervisor沒有上報心跳或者已經不可達了,他將會將故障supervisor分配的task重新分配到叢集中的其他supervisor節點。
-
nimbus死亡
即使nimbus守護程序在topology執行時停止了,只要分配的supervisor和worker健康執行,topology會一直繼續處理資料,被稱為半容錯機制。
-
2 Supervisor 工作節點
Nimbus和Supervisor之間的協調則通過ZooKeeper系統。
-
主要任務
等待nimbus分配任務後,生成並監控workers、執行任務。
-
故障處理
supervisor和worker都是執行在不同的JVM程序上,如果supervisor啟動的worker程序因為錯誤異常退出,supervisor將會嘗試重新生成新的worker程序。
3 Zookeeper 協調服務元件
ZooKeeper是完成Nimbus和Supervisor之間協調的服務的中介軟體。Storm使用ZooKeeper協調叢集,由於ZooKeeper並不用於訊息傳遞,所以Storm給ZooKeeper帶來的壓力相當低。在大多數情況下,單個節點的ZooKeeper叢集足夠勝任,不過為了確保故障恢復或者部署大規模Storm叢集,可能需要更大規模的ZooKeeper叢集。
4 Worker 工作程序
執行具體處理元件邏輯的程序。
5 Task 任務執行緒
Worker中的每一個Spout/Bolt執行緒稱為一個Task。在Storm0.8之後,Task不再與物理執行緒對應,同一個Spout/Bolt的Task可能會共享一個物理執行緒,該執行緒稱為Executor。
面試題目
Storm和sparkstreaming的區別
Storm的原理的詳細介紹
Storm中如何實現統計uv的不重複。
對流式計算storm的認識?專案應用到storm的應用場景簡介?