storm架構及原理
storm 架構與原理
1 storm簡介
1.1 storm是什麼
-
如果只用一句話來描述 storm 是什麼的話:分散式 && 實時 計算系統。按照作者 Nathan Marz 的說法,storm對於實時計算的意義類似於hadoop對於批處理的意義。
-
Hadoop(大資料分析領域無可爭辯的王者)專注於批處理。這種模型對許多情形(比如為網頁建立索引)已經足夠,但還存在其他一些使用模型,它們需要來自高度動態的來源的實時資訊。為了解決這個問題,就得藉助 Nathan Marz 推出的 storm(現在已經被Apache孵化)storm 不處理靜態資料,但它處理連續的流資料。
1.2 storm 與傳統的大資料
-
storm 與其他大資料解決方案的不同之處在於它的處理方式。Hadoop 在本質上是一個批處理系統。資料被引入 Hadoop 檔案系統 (HDFS) 並分發到各個節點進行處理。當處理完成時,結果資料返回到 HDFS 供始發者使用。storm 支援建立拓撲結構來轉換沒有終點的資料流。不同於 Hadoop 作業,這些轉換從不停止,它們會持續處理到達的資料。
-
Hadoop 的核心是使用 Java™ 語言編寫的,但支援使用各種語言編寫的資料分析應用程式。而 Twitter Storm 是使用 Clojure語言實現的。
-
Clojure 是一種基於虛擬機器 (VM) 的語言,在 Java 虛擬機器上執行。但是,儘管 storm 是使用 Clojure 語言開發的,您仍然可以在 storm 中使用幾乎任何語言編寫應用程式。所需的只是一個連線到 storm 的架構的介面卡
2 Hadoop 架構的瓶頸
- Hadoop是優秀的大資料離線處理技術架構,主要採用的思想是“分而治之”,對大規模資料的計算進行分解,然後交由眾多的計算節點分別完成,再統一彙總計算結果。Hadoop架構通常的使用方式為批量收集輸入資料,批量計算,然後批量吐出計算結果。然而Hadoop結構在處理實時性要求較高的業務時,卻顯得力不從心。本章內容對Hadoop架構這種瓶頸的產生原因進行了探究。 實時性不足(基於離線)
2.1 Hadoop架構簡介
-
Hadoop架構的核心組成部分是HDFS(Hadoop Distributed File System,Hadoop分散式檔案系統)和MapReduce分散式計算框架。HDFS採用Master/Slave體系結構,在叢集中由一個主節點充當NameNode,負責檔案系統元資料的管理,其它多個子節點充當Datanode,負責儲存實際的資料塊。
-
MapReduce分散式計算模型由JobTracker和TaskTracker兩類服務程序實現,JobTracker負責任務的排程和管理,TaskTracker負責實際任務的執行。
2.2 Hadoop架構的瓶頸
-
在手機閱讀BI大屏時延專案中,業務需求為處理業務平臺產生的海量使用者資料,展現業務中PV(Page View,頁面瀏覽量)、UV(Unique Visitor,獨立訪客)。營收和付費使用者數等關鍵運營指標,供領導層實時瞭解運營狀況,做出經營決策,在一期專案的需求描述中,允許的計算時延是15分鐘。
-
根據需求,在一期專案的實施中,搭建了Hadoop平臺與Hive資料倉庫,通過編寫Hive儲存過程,來完成資料的處理,相當於是一個離線的批處理過程,不同的運營指標擁有不同的演算法公式,各公式的複雜程度不同導致各運營指標演算法複雜度不同,因此所需要的計算時延也各不相同,如PV指標的計算公式相對簡單,可以在5分鐘內完成計算,而頁面訪問成功率指標的計算公式相對複雜,需要10分鐘以上才能完成計算。專案到達二期階段時,對實時性的要求有了進一步提高,允許的計算時延減少到5分鐘,在這種應用場景下,Hadoop架構已經不能滿足需要,無法在指定的時延內完成所有運營指標的計算。
- 在以上的應用場景中,Hadoop的瓶頸主要體現在以下兩點:
- 1) MapReduce計算框架初始化較為耗時,並不適合小規模的批處理計算。因為MapReduce框架並非輕量級框架,在執行一個作業時,需要進行很多 初始化 的工作,主要包括檢查作業的輸入輸出路徑,將作業的輸入資料分塊,建立作業統計資訊以及將作業程式碼的Jar檔案和配置檔案拷貝到HDFS上,當輸入資料的規模很大時,框架初始化所耗費的時間遠遠小於計算所耗費的時間,所以初始化的時間可以忽略不計;而當輸入資料的規模較小時,初始化所耗費的時間甚至超過了計算所耗費的時間,導致計算效率低下,產生了效能上的瓶頸。
- 2) Reduce任務的計算速度較慢。有的運營指標計算公式較為複雜,為之編寫的Hive儲存過程經Hive直譯器解析後產生了Reduce任務,導致無法在指定的時延內完成計算。這是由於Reduce任務的計算過程分為三個階段,分別是copy階段,sort階段和reduce階段。其中copy階段要求每個計算節點從其它所有計算節點上抽取其所需的計算結果,copy操作需要佔用大量的網路頻寬,十分耗時,從而造成Reduce任務整體計算速度較慢。
3 storm 架構的優點
- storm的流式處理計算模式保證了任務能夠只進行一次初始化,就能夠持續計算,同時使用了ZeroMQ(Netty)作為底層訊息佇列,有效地提高了整體架構的資料處理效率,避免了Hadoop的瓶頸。
- Storm的適用場景:
- 1.流資料處理,Storm可以用來處理源源不斷流進來的訊息,處理之後將結果寫入到某個儲存中去。
- 2.分散式rpc,由於storm的處理元件是分散式的,而且處理延遲極低,所以可以作為一個通用的分散式rpc框架來使用。
- 3.持續計算,任務一次初始化,一直執行,除非你手動kill它。
3.1 storm架構的設計
- 與Hadoop主從架構一樣,Storm也採用Master/Slave體系結構,分散式計算由Nimbus和Supervisor兩類服務程序實現,Nimbus程序執行在叢集的主節點,負責任務的指派和分發,Supervisor執行在叢集的從節點,負責執行任務的具體部分。
- 如圖所示:
- Nimbus:負責資源分配和任務排程。
- Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。
- Worker:執行具體處理元件邏輯的程序。
- Task:worker中每一個spout/bolt的執行緒稱為一個task。同一個spout/bolt的task可能會共享一個物理執行緒,該執行緒稱為executor。
- storm架構中使用Spout/Bolt程式設計模型來對訊息進行流式處理。訊息流是storm中對資料的基本抽象,一個訊息流是對一條輸入資料的封裝,源源不斷輸入的訊息流以分散式的方式被處理,Spout元件是訊息生產者,是storm架構中的資料輸入源頭,它可以從多種異構資料來源讀取資料,併發射訊息流,Bolt元件負責接收Spout元件發射的資訊流,並完成具體的處理邏輯。在複雜的業務邏輯中可以串聯多個Bolt元件,在每個Bolt元件中編寫各自不同的功能,從而實現整體的處理邏輯。
3.2 storm架構與Hadoop架構的對比
-
storm架構與Hadoop架構的總體結構相似。
結構 Hadoop Storm 主節點 JobTracker Nimbus 從節點 TaskTracker Supervisor 應用程式 Job Topology 工作程序名稱 Child Worker 計算模型 Map / Reduce Spout / Bolt -
在Hadoop架構中,主從節點分別執行JobTracker和TaskTracker程序,在storm架構中,主從節點分別執行Nimbus和Supervisor程序。在Hadoop架構中,應用程式的名稱是Job,Hadoop將一個Job解析為若干Map和Reduce任務,每個Map或Reduce任務都由一個Child程序來執行,該Child程序是由TaskTracker在子節點上產生的子程序。
-
在Storm架構中,應用程式的名稱是Topology,Storm將一個Topology劃分為若干個部分,每部分由一個Worker程序來執行,該Worker程序是Supervisor在子節點上產生的子程序,在每個Worker程序中存在著若干Spout和Bolt執行緒,分別負責Spout和Bolt元件的資料處理過程。
-
從應用程式的比較中可以看明顯地看到Hadoop和Storm架構的主要不同之處。在Hadoop架構中,應用程式Job代表著這樣的作業:輸入是確定的,作業可以在有限時間內完成,當作業完成時Job的生命週期走到終點,輸出確定的計算結果;而在Storm架構中,Topology代表的並不是確定的作業,而是持續的計算過程,在確定的業務邏輯處理框架下,輸入資料來源源不斷地進入系統,經過流式處理後以較低的延遲產生輸出。如果不主動結束這個Topology或者關閉Storm叢集,那麼資料處理的過程就會持續地進行下去。
- 通過以上的分析,我們可以看到,storm架構是如何解決Hadoop架構瓶頸的:
- Storm的Topology只需初始化一次。在將Topology提交到Storm叢集的時候,叢集會針對該Topology做一次初始化的工作,此後,在Topology執行過程中,對於輸入資料而言,是沒有計算框架初始化耗時的,有效避免了計算框架初始化的時間損耗。
- Storm使用Netty作為底層的訊息佇列來傳遞訊息,保證訊息能夠得到快速的處理,同時Storm採用記憶體計算模式,無需藉助檔案儲存,直接通過網路直傳中間計算結果,避免了元件之間傳輸資料的大量時間損耗。
3.3 storm的優點
- Storm 實現的一些特徵決定了它的效能和可靠性的,Storm 使用 Netty 傳送訊息,這就消除了中間的排隊過程,使得訊息能夠直接在任務自身之間流動,在訊息的背後,是一種用於序列化和反序列化 Storm 的原語型別的自動化且高效的機制。
-
Storm 的一個最有趣的地方是它注重容錯和管理,Storm 實現了有保障的訊息處理,所以每個元組(Turple)都會通過該拓撲(Topology)結構進行全面處理;如果發現一個元組還未處理,它會自動從Spout處重發,Storm 還實現了任務級的故障檢測,在一個任務發生故障時,訊息會自動重新分配以快速重新開始處理。Storm 包含比 Hadoop 更智慧的處理管理,流程會由zookeeper來進行管理,以確保資源得到充分使用。
- 總結一下,有以下優點:
- 簡單程式設計,在大資料處理方面相信大家對hadoop已經耳熟能詳,基於Google Map/Reduce來實現的Hadoop為開發者提供了map、reduce原語,使並行批處理程式變得非常地簡單和優美。同樣,Storm也為大資料的實時計算提供了一些簡單優美的原語,這大大降低了開發並行實時處理的任務的複雜性,幫助你快速、高效的開發應用。
- 多語言支援,除了用java實現spout和bolt,你還可以使用任何你熟悉的程式語言來完成這項工作,這一切得益於Storm所謂的多語言協議。多語言協議是Storm內部的一種特殊協議,允許spout或者bolt使用標準輸入和標準輸出來進行訊息傳遞,傳遞的訊息為單行文字或者是json編碼的多行。
- 支援水平擴充套件,在Storm叢集中真正執行topology的主要有三個實體:工作程序、執行緒和任務。Storm叢集中的每臺機器上都可以執行多個工作程序,每個工作程序又可建立多個執行緒,每個執行緒可以執行多個任務,任務是真正進行資料處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。因此,計算任務在多個執行緒,程序和伺服器之間並行進行,支援靈活的水平擴充套件。
- 容錯性強,如果在訊息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元,Storm保證一個處理單元永遠執行(除非你顯式殺掉這個處理單元)。
- 可靠性的訊息保證 Storm可以保證spout發出的每條訊息都能被“完全處理”。
- 快速的訊息處理,用Netty作為底層訊息佇列, 保證訊息能快速被處理。
- 本地模式,支援快速程式設計測試。
4 其他大資料解決方案
- 自 Google 在 2004 年推出 MapReduce 正規化以來,已誕生了多個使用原始 MapReduce 正規化(或擁有該正規化的質量)的解決方案。Google 對 MapReduce 的最初應用是建立全球資訊網的索引。儘管此應用程式仍然很流行,但這個簡單模型解決的問題也正在增多。
- 下標中提供了一個可用開源大資料解決方案的列表,包括傳統的批處理和流式處理應用程式。在將 Storm 引入開源之前將近一年的時間裡,Yahoo! 的 S4 分散式流計算平臺已向 Apache 開源。S4 於 2010 年 10 月釋出,它提供了一個高效能運算 (HPC) 平臺,嚮應用程式開發人員隱藏了並行處理的複雜性。S4 實現了一個可擴充套件的、分散化的叢集架構,並納入了部分容錯功能。
解決方案 | 開發商 | 型別 | 描述 |
---|---|---|---|
storm | 流式處理 | Twitter 的新流式大資料分析解決方案 | |
S4 | Yahoo! | 流式處理 | 來自 Yahoo! 的分散式流計算平臺 |
Hadoop | Apache | 批處理 | MapReduce 正規化的第一個開源實現 |
Spark | UC Berkeley AMPLab | 批處理 | 支援記憶體中資料集和恢復能力的最新分析平臺 |
Disco | Nokia | 批處理 | Nokia 的分散式 MapReduce 框架 |
HPCC | LexisNexis | 批處理 | HPC 大資料叢集 |
5 storm基本概念
-
下面介紹Storm的基本概念和資料流模型。Storm是一個開源的實時計算系統,它提供了一系列的基本元素用於進行計算:Topology、Stream、Spout、Bolt等等。
-
Storm叢集和Hadoop叢集表面上看很類似。但是Hadoop上執行的是MapReduce jobs,而在Storm上執行的是拓撲(topology),這兩者之間是非常不一樣的,一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會執行(除非你手動kill掉)。
-
在Storm的叢集裡面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面執行一個叫Nimbus後臺程式,它的作用類似Hadoop裡面的JobTracker,Nimbus負責在叢集裡面分發程式碼,分配計算任務給機器,並且監控狀態。每一個工作節點上面執行一個叫做Supervisor的程序。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作程序worker。每一個工作程序執行一個topology的一個子集;一個執行的topology由執行在很多機器上的很多工作程序worker組成。(一個supervisor裡面有多個workder,一個worker是一個JVM。可以配置worker的數量,對應的是conf/storm.yaml中的supervisor.slot的數量)
- Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper叢集完成。另外,Nimbus程序和Supervisor程序都是快速失敗(fail-fast)和無狀態的。所有的狀態要麼在zookeeper裡面, 要麼在本地磁碟上。這也就意味著你可以用kill -9來殺死Nimbus和Supervisor程序,然後再重啟它們,就好像什麼都沒有發生過,這個設計使得Storm異常的穩定。
5.1 Topology
- 在Storm中,一個實時應用的計算任務被打包作為Topology釋出,這同Hadoop的MapReduce任務相似。但是有一點不同的是:在Hadoop中,MapReduce任務最終會執行完成後結束;而在Storm中,Topology任務一旦提交後永遠不會結束,除非你顯示去停止任務。計算任務Topology是由不同的Spouts和Bolts,通過資料流(Stream)連線起來的圖。下面是一個Topology的結構示意圖:
- 其中包含有:
- Spout:Storm中的訊息源,用於為Topology生產訊息(資料),一般是從外部資料來源(如Message Queue、RDBMS、NoSQL、Realtime Log )不間斷地讀取資料併發送給Topology訊息(tuple元組)。
- Bolt:Storm中的訊息處理者,用於為Topology進行訊息的處理,Bolt可以執行過濾,聚合, 查詢資料庫等操作,而且可以一級一級的進行處理。
-
下圖是Storm的資料互動圖,可以看出兩個模組Nimbus和Supervisor之間沒有直接互動。狀態都是儲存在Zookeeper上,Worker之間通過Netty傳送資料。Storm與Zookeeper之間的互動過程,暫時不細說了。重要的一點:storm所有的元資料資訊儲存在Zookeeper中!
5.2 資料模型Turple
-
storm使用tuple來作為它的資料模型。每個tuple是一堆值,每個值有一個名字,並且每個值可以是任何型別,在我的理解裡面一個tuple可以看作一個java物件。總體來看,storm支援所有的基本型別:字串以及位元組陣列作為tuple的值型別。你也可以使用你自己定義的型別來作為值型別,只要你實現對應的序列化器(serializer)。
-
一個Tuple代表資料流中的一個基本的處理單元,它可以包含多個Field,每個Field表示一個屬性。比如舉例一個,三個欄位(taskID:int; StreamID:String; ValueList: List):
-
Tuple是一個Key-Value的Map,由於各個元件間傳遞的tuple的欄位名稱已經事先定義好了,所以Tuple只需要按序填入各個Value,所以就是一個Value List。一個沒有邊界的,源源不斷的,連續的Tuple序列就組成了Stream。
- topology裡面的每個節點必須定義它要發射的tuple的每個欄位。
5.3 worker(程序)
- 一個topology可能會在一個或者多個worker(工作程序)裡面執行,每個worker是一個物理JVM並且執行整個topology的一部分。比如,對於並行度是300的topology來說,如果我們使用50個工作程序worker來執行,那麼每個工作程序會處理其中的6個tasks。Storm會盡量均勻的工作分配給所有的worker,setBolt 的最後一個引數是你想為bolts的並行量。
5.4 Spouts
-
訊息源spout是Storm裡面一個topology裡面的訊息生產者。一般來說訊息源會從一個外部源讀取資料並且向topology裡面發出訊息:tuple。Spout可以是可靠的也可以是不可靠的,如果這個tuple沒有被storm成功處理,可靠的訊息源spouts可以重新發射一個tuple,但是不可靠的訊息源spouts一旦發出一個tuple就不能重發了。
-
訊息源可以發射多條訊息流stream。使用OutputFieldsDeclarer。declareStream來定義多個stream,然後使用SpoutOutputCollector來發射指定的stream。程式碼上是這樣的:collector.emit(new Values(str));
-
Spout類裡面最重要的方法是nextTuple。要麼發射一個新的tuple到topology裡面或者簡單的返回如果已經沒有新的tuple。要注意的是nextTuple方法不能阻塞,因為storm在同一個執行緒上面呼叫所有訊息源spout的方法。另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候呼叫ack,否則呼叫fail。storm只對可靠的spout呼叫ack和fail。
5.5 Bolts
-
所有的訊息處理邏輯被封裝在bolts裡面。Bolts可以做很多事情:過濾,聚合,查詢資料庫等等。
-
Bolts可以簡單的做訊息流的傳遞(來一個元組,呼叫一次execute)。複雜的訊息流處理往往需要很多步驟,從而也就需要經過很多bolts。比如算出一堆圖片裡面被轉發最多的圖片就至少需要兩步:第一步算出每個圖片的轉發數量,第二步找出轉發最多的前10個圖片。(如果要把這個過程做得更具有擴充套件性那麼可能需要更多的步驟)。
-
Bolts可以發射多條訊息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。
-
Bolts的主要方法是execute,它以一個tuple作為輸入,bolts使用OutputCollector來發射tuple(spout使用SpoutOutputCollector來發射指定的stream),bolts必須要為它處理的每一個tuple呼叫OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發射者spouts。一般的流程是: bolts處理一個輸入tuple, 發射0個或者多個tuple, 然後呼叫ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動呼叫ack。
5.6 Reliability
- Storm保證每個tuple會被topology完整的執行。Storm會追蹤由每個spout tuple所產生的tuple樹(一個bolt處理一個tuple之後可能會發射別的tuple從而形成樹狀結構),並且跟蹤這棵tuple樹什麼時候成功處理完。每個topology都有一個訊息超時的設定,如果storm在這個超時的時間內檢測不到某個tuple樹到底有沒有執行成功,那麼topology會把這個tuple標記為執行失敗,並且過一會兒重新發射這個tuple(超時的時間在storm0.9.0.1版本中是可以設定的,預設是30s)。
5.7 Tasks
- 每一個spout和bolt會被當作很多task在整個叢集裡執行。每一個executor對應到一個執行緒,在這個執行緒上執行多個task,而stream grouping則是定義怎麼從一堆task發射tuple到另外一堆task。你可以呼叫TopologyBuilder類的setSpout和setBolt來設定並行度。SetSpout裡面的並行度引數含義:parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster。(執行這個spout安排了N個tasks。每個task是一個執行緒,他們都在同一個程序中。)setBolt的引數含義也是一樣的。
6 Storm資料流模型
- 資料流(Stream)是Storm中對資料進行的抽象,它是時間上無界的tuple元組序列。在Topology中,Spout是Stream的源頭。負責為Topology從特定資料來源發射Stream;Bolt可以接收任意多個Stream作為輸入,然後進行資料的加工處理過程,如果需要,Bolt還可以發射出新的Stream給下級Bolt進行處理。下面是一個Topology內部Spout和Bolt之間的資料流關係:
- Topology中每一個計算元件(Spout和Bolt)都有一個並行執行度,在建立Topology時可以進行指定,Storm會在叢集內分配對應並行度個數的執行緒來同時執行這一元件。那麼,有一個問題:既然對於一個Spout或Bolt,都會有多個task執行緒來執行,那麼如何在兩個元件(Spout和Bolt)之間傳送tuple元組呢?Storm提供了若干種資料流分發(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什麼樣的Stream作為其輸入(注:Spout並不需要接收Stream,只會發射Stream)。目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping。
6.1 Stream groupings
- Storm裡面有7種類型的stream grouping
- Shuffle Grouping: 隨機分組, 隨機派發stream裡面的tuple,保證每個bolt接收到的tuple數目大致相同。
- Fields Grouping:按欄位分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts裡的一個task。而不同的userid則會被分配到不同的bolts裡的task。
- All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
- Global Grouping:全域性分組, 這個tuple被分配到storm中的一個bolt的其中一個task,再具體一點就是分配給id值最低的那個task。
- Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果。有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行。
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者指定由訊息接收者的哪個task處理這個訊息。。只有被宣告為Direct Stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitDirect方法來發射。訊息處理者可以通過TopologyContext來獲取處理它的訊息的task的id (OutputCollector.emit方法也會返回task的id)。
- Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作程序worker中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
6.2 storm 記錄級容錯
-
相比於s4, puma等其他實時計算系統,storm最大的亮點在於其記錄級容錯和能夠保證訊息精確處理的事務功能,下面就重點來看一下這兩個亮點的實現原理。
-
Storm記錄級容錯的基本原理。首先來看一下什麼叫做記錄級容錯?storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id,這個message id可以是任意的object物件。多個源tuple可以共用一個message id,表示這多個源 tuple對使用者來說是同一個訊息單元。storm中記錄級容錯的意思是說,storm會告知使用者每一個訊息單元是否在指定時間內被完全處理了。那什麼叫做完全處理呢,就是該message id繫結的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。如下圖,在spout由message 1繫結的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。
- 在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id繫結的若干tuple的處理路徑,如果在使用者設定的最大超時時間內這些tuple沒有被完全處理,那麼acker就會告知spout該訊息處理失敗了,相反則會告知spout該訊息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這麼做的同學可以仔細地思考一下這件事的複雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來複習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。 - storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會為使用者指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及後續的bolt作為該訊息單元的唯一標識。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之後,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker只需要對這些id做一個簡單的異或運算,就能判斷出該root
id對應的訊息單元是否處理完成了。下面通過一個圖示來說明這個過程。
- 第1步:初始化 spout中繫結message 1生成了兩個源tuple,id分別是0010和1011。
- 第2步:計算一個turple達到第1個bolt。bolt1處理tuple 0010時生成了一個新的tuple,id為0110。
- 第3步:計算一個turple達到第2個bolt,bolt2處理tuple 1011時生成了一個新的tuple,id為0111。
- 第4步:訊息到達最後一個bolt。
- 第1步:初始化 spout中繫結message 1生成了兩個源tuple,id分別是0010和1011。
- 可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id並不是完全各異的,acker可能會在訊息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的確是存在的,但是在實際中其概率是極低極低的,完全可以忽略。
6.3 Storm的事務拓撲
- 事務拓撲(transactional topology)是storm0.7引入的特性,在0.8版本以後的版本中已經被封裝為Trident,提供了更加便利和直觀的介面。因為篇幅所限,在此對事務拓撲做一個簡單的介紹。
- 事務拓撲的目的是為了滿足對訊息處理有著極其嚴格要求的場景,例如實時計算某個使用者的成交筆數,要求結果完全精確,不能多也不能少。Storm的事務拓撲是完全基於它底層的spout/bolt/acker原語實現的。通過一層巧妙的封裝得出一個優雅的實現。
- 事務拓撲簡單來說就是將訊息分為一個個的批(batch),同一批內的訊息以及批與批之間的訊息可以並行處理,另一方面,使用者可以設定某些bolt為committer,storm可以保證committer的finishBatch()操作是按嚴格不降序的順序執行的。使用者可以利用這個特性通過簡單的程式設計技巧實現訊息處理的精確。
7 並法度的理解
7.1 Example of a running topology
- The following illustration shows how a simple topology would look like in operation. The topology consists of three components: one spout called BlueSpout and two bolts called GreenBolt and YellowBolt. The components are linked such that BlueSpout sends its output to GreenBolt, which in turns sends its own output to YellowBolt.
- 總結:
- 一個Topology可以包含多個worker ,一個worker只能對應於一個topology。worker process是一個topology的子集。
- 一個worker可以包含多個executor,一個executor只能對應於一個component(spout或者bolt)。
- Task就是具體的處理邏輯,一個executor執行緒可以執行一個或多個tasks。執行緒就是資源,task就是要執行的任務。
7.2 併發度的配置有效的順序
- Storm currently has the following order of precedence for configuration settings:
defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration。