1. 程式人生 > >流式處理框架storm淺析

流式處理框架storm淺析

min 完成 文檔 efault 生成 沒有 ado 編程 現在

前言
前一段時間參與哨兵流式監控功能設計,調研了兩個可以做流式計算的框架:storm和spark streaming,我負責storm的調研工作。斷斷續續花了一周的時間看了官網上的doc和網絡上的一些資料。我把所學到的總結成一個文檔,發出來給對storm感興趣的同事做入門引導。

storm背景
隨著互聯網的更進一步發展,從Portal信息瀏覽型到Search信息搜索型到SNS關系交互傳遞型,以及電子商務、互聯網旅遊生活產品等將生活中的流通環節在線化。對效率的要求讓大家對於實時性的要求進一步提升,而信息的交互和溝通正在從點對點往信息鏈甚至信息網的方向發展,這樣必然帶來數據在各個維度的交叉關聯,數據爆炸已不可避免。因此流式處理加NoSQL產品應運而生,分別解決實時框架和數據大規模存儲計算的問題。

2011年twitter對Storm開源。以前互聯網的開發人員在做一個實時應用的時候,除了要關註應用邏輯計算處理本身,還要為了數據的實時流轉、交互、分布大傷腦筋。現在開發人員可以快速的搭建一套健壯、易用的實時流處理框架,配合SQL產品或者NoSQL產品或者MapReduce計算平臺,就可以低成本的做出很多以前很難想象的實時產品:比如一淘數據部的量子恒道品牌旗下的多個產品就是構建在實時流處理平臺上的。

strom語言
Storm的主要開發語言是clojure,完成核心功能邏輯,輔助開發語言還有Python和java

strom的特點

  1. 編程模型簡單

    Storm同hadoop一樣,為大數據的實時計算提供了一些簡單優美的原語,這大大降低了開發並行實時處理的任務的復雜性,幫助你快速、高效的開發應用。

  2. 可擴展

    在Storm集群中真正運行topology的主要有三個實體:工作進程、線程和任務。Storm集群中的每臺機器上都可以運行多個工作進程,每個工作進程又可創建多個線程,每個線程可以執行多個任務,任務是真正進行數據處理的實體,我們開發的spout、bolt就是作為一個或者多個任務的方式執行的。 因此,計算任務在多個線程、進程和服務器之間並行進行,支持靈活的水平擴展。

  3. 高可靠

    Storm可以保證spout發出的每條消息都能被“完全處理”。 spout發出的消息後續可能會觸發產生成千上萬條消息,可以形象的理解為一棵消息樹,其中spout發出的消息為樹根,Storm會跟蹤這棵消息樹的處理情況,只有當這棵消息樹中的所有消息都被處理了,Storm才會認為spout發出的這個消息已經被“完全處理”。如果這棵消息樹中的任何一個消息處理失敗了,或者整棵消息樹在限定的時間內沒有“完全處理”,那麽spout發出的消息就會重發。 考慮到盡可能減少對內存的消耗,Storm並不會跟蹤消息樹中的每個消息,而是采用了一些特殊的策略,它把消息樹當作一個整體來跟蹤,對消息樹中所有消息的唯一id進行異或計算,通過是否為零來判定spout發出的消息是否被“完全處理”,這極大的節約了內存和簡化了判定邏輯,後面會在下文對這種機制進行詳細介紹。

    這種模式,每發送一個消息,都會同步發送一個ack/fail,對於網絡的帶寬會有一定的消耗,如果對於可靠性要求不高,可通過使用不同的emit接口關閉該模式。

    上面所說的,Storm保證了每個消息至少被處理一次,但是對於有些計算場合,會嚴格要求每個消息只被處理一次,Storm的0.7.0引入了事務性拓撲,解決了這個問題。

  4. 高容錯

    如果在消息處理過程中出了一些異常,Storm會重新安排這個出問題的處理單元。Storm保證一個處理單元永遠運行(除非你顯式殺掉這個處理單元)。當然,如果處理單元中存儲了中間狀態,那麽當處理單元重新被Storm啟動的時候,需要應用自己處理中間狀態的恢復。

  5. 快速

    這裏的快主要是指的時延。storm的網絡直傳、內存計算,其時延必然比hadoop的通過hdfs傳輸低得多;當計算模型比較適合流式時,storm的流式處理,省去了批處理的收集數據的時間;因為storm是服務型的作業,也省去了作業調度的時延。所以從時延上來看,storm要快於hadoop。

    說一個典型的場景,幾千個日誌生產方產生日誌文件,需要進行一些ETL操作存入一個數據庫。

    假設利用hadoop,則需要先存入hdfs,按每一分鐘切一個文件的粒度來算(這個粒度已經極端的細了,再小的話hdfs上會一堆小文件),hadoop開始計算時,1分鐘已經過去了,然後再開始調度任務又花了一分鐘,然後作業運行起來,假設機器特別多,幾鈔鐘就算完了,然後寫數據庫假設也花了很少的時間,這樣,從數據產生到最後可以使用已經過去了至少兩分多鐘。

    而流式計算則是數據產生時,則有一個程序去一直監控日誌的產生,產生一行就通過一個傳輸系統發給流式計算系統,然後流式計算系統直接處理,處理完之後直接寫入數據庫,每條數據從產生到寫入數據庫,在資源充足時可以在毫秒級別完成。

  6. 支持多種編程語言

    除了用java實現spout和bolt,你還可以使用任何你熟悉的編程語言來完成這項工作,這一切得益於Storm所謂的多語言協議。多語言協議是Storm內部的一種特殊協議,允許spout或者bolt使用標準輸入和標準輸出來進行消息傳遞,傳遞的消息為單行文本或者是json編碼的多行。

  7. 支持本地模式

    Storm有一種“本地模式”,也就是在進程中模擬一個Storm集群的所有功能,以本地模式運行topology跟在集群上運行topology類似,這對於我們開發和測試來說非常有用。

storm的組成
在Storm的集群裏面有兩種節點: 控制節點(master node)和工作節點(worker node)。控制節點上面運行一個叫Nimbus後臺程序,它的作用類似Hadoop裏面的JobTracker。Nimbus負責在集群裏面分發代碼,分配計算任務給機器, 並且監控狀態。

每一個工作節點上面運行一個叫做Supervisor的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個topology的一個子集;一個運行的topology由運行在很多機器上的很多工作進程組成。

Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。另外,Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要麽在zookeeper裏面, 要麽在本地磁盤上。這也就意味著你可以用kill -9來殺死Nimbus和Supervisor進程, 然後再重啟它們,就好像什麽都沒有發生過。這個設計使得Storm異常的穩定。

接下來我們再來具體看一下這些概念。

Nimbus:負責資源分配和任務調度。

Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。

Worker:運行具體處理組件邏輯的進程。

Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之後,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。

下面這個圖描述了以上幾個角色之間的關系。

技術分享圖片

圖片描述(最多50字)

Topology基本原理
Storm集群和Hadoop集群表面上看很類似。但是Hadoop上運行的是MapReduce jobs,而在Storm上運行的是拓撲(topology),這兩者之間是非常不一樣的。一個關鍵的區別是: 一個MapReduce job最終會結束, 而一個topology永遠會運行(除非你手動kill掉)。

1 拓撲(Topologies)

一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

技術分享圖片

圖片描述(最多50字)

一個topology會一直運行直到你手動kill掉,Storm自動重新分配執行失敗的任務, 並且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上。

2 流(Streams)

數據流(Streams)是 Storm 中最核心的抽象概念。一個數據流指的是在分布式環境中並行創建、處理的一組元組(tuple)的×××序列。數據流可以由一種能夠表述數據流中元組的域(fields)的模式來定義。在默認情況下,元組(tuple)包含有整型(Integer)數字、長整型(Long)數字、短整型(Short)數字、字節(Byte)、雙精度浮點數(Double)、單精度浮點數(Float)、布爾值以及字節數組等基本類型對象。當然,你也可以通過定義可序列化的對象來實現自定義的元組類型。

3 數據源(Spouts)

數據源(Spout)是拓撲中數據流的來源。一般 Spout 會從一個外部的數據源讀取元組然後將他們發送到拓撲中。根據需求的不同,Spout 既可以定義為可靠的數據源,也可以定義為不可靠的數據源。一個可靠的 Spout 能夠在它發送的元組處理失敗時重新發送該元組,以確保所有的元組都能得到正確的處理;相對應的,不可靠的 Spout 就不會在元組發送之後對元組進行任何其他的處理。

一個 Spout 可以發送多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream 方法來聲明定義不同的數據流,然後在發送數據時在 SpoutOutputCollector 的 emit 方法中將數據流 id 作為參數來實現數據發送的功能。

Spout 中的關鍵方法是 nextTuple。顧名思義,nextTuple 要麽會向拓撲中發送一個新的元組,要麽會在沒有可發送的元組時直接返回。需要特別註意的是,由於 Storm 是在同一個線程中調用所有的 Spout 方法,nextTuple 不能被 Spout 的任何其他功能方法所阻塞,否則會直接導致數據流的中斷。

Spout 中另外兩個關鍵方法是 ack 和 fail,他們分別用於在 Storm 檢測到一個發送過的元組已經被成功處理或處理失敗後的進一步處理。註意,ack 和 fail 方法僅僅對上述“可靠的” Spout 有效。

4 數據流處理組件(Bolts)

拓撲中所有的數據處理均是由 Bolt 完成的。通過數據過濾(filtering)、函數處理(functions)、聚合(aggregations)、聯結(joins)、數據庫交互等功能,Bolt 幾乎能夠完成任何一種數據處理需求

一個 Bolt 可以實現簡單的數據流轉換,而更復雜的數據流變換通常需要使用多個 Bolt 並通過多個步驟完成。例如,將一個微博數據流轉換成一個趨勢圖像的數據流至少包含兩個步驟:其中一個 Bolt 用於對每個圖片的微博轉發進行滾動計數,另一個或多個 Bolt 將數據流輸出為“轉發最多的圖片”結果(相對於使用2個Bolt,如果使用3個 Bolt 你可以讓這種轉換具有更好的可擴展性)。

與 Spout 相同,Bolt 也可以輸出多個數據流。為了實現這個功能,可以先通過 OutputFieldsDeclarer 的 declareStream 方法來聲明定義不同的數據流,然後在發送數據時在 OutputCollector 的 emit 方法中將數據流 id 作為參數來實現數據發送的功能。

在定義 Bolt 的輸入數據流時,你需要從其他的 Storm 組件中訂閱指定的數據流。如果你需要從其他所有的組件中訂閱數據流,你就必須要在定義 Bolt 時分別註冊每一個組件。對於聲明為默認 id(即上文中提到的“default”——譯者註)的數據流,InputDeclarer支持訂閱此類數據流的語法糖。也就是說,如果需要訂閱來自組件“1”的數據流,declarer.shuffleGrouping("1") 與 declarer.shuffleGrouping("1", DEFAULT_STREAM_ID) 兩種聲明方式是等價的。

Bolt 的關鍵方法是 execute 方法。execute 方法負責接收一個元組作為輸入,並且使用 OutputCollector 對象發送新的元組。如果有消息可靠性保障的需求,Bolt 必須為它所處理的每個元組調用 OutputCollector 的 ack 方法,以便 Storm 能夠了解元組是否處理完成(並且最終決定是否可以響應最初的 Spout 輸出元組樹)。一般情況下,對於每個輸入元組,在處理之後可以根據需要選擇不發送還是發送多個新元組,然後再響應(ack)輸入元組。IBasicBolt 接口能夠實現元組的自動應答。

5 數據流分組(Stream groupings)

為拓撲中的每個 Bolt 的確定輸入數據流是定義一個拓撲的重要環節。數據流分組定義了在 Bolt 的不同任務(tasks)中劃分數據流的方式。

在 Storm 中有八種內置的數據流分組方式,而且你還可以通過CustomStreamGrouping 接口實現自定義的數據流分組模型。這八種分組分時分別為:

1. 隨機分組(Shuffle grouping):這種方式下元組會被盡可能隨機地分配到 Bolt 的不同任務(tasks)中,使得每個任務所處理元組數量能夠能夠保持基本一致,以確保集群的負載均衡。

2. 域分組(Fields grouping):這種方式下數據流根據定義的“域”來進行分組。例如,如果某個數據流是基於一個名為“user-id”的域進行分組的,那麽所有包含相同的“user-id”的元組都會被分配到同一個任務中,這樣就可以確保消息處理的一致性。

3. 部分關鍵字分組(Partial Key grouping):這種方式與域分組很相似,根據定義的域來對數據流進行分組,不同的是,這種方式會考慮下遊 Bolt 數據處理的均衡性問題,在輸入數據源關鍵字不平衡時會有更好的性能1。感興趣的讀者可以參考這篇論文,其中詳細解釋了這種分組方式的工作原理以及它的優點。

4. 完全分組(All grouping):這種方式下數據流會被同時發送到 Bolt 的所有任務中(也就是說同一個元組會被復制多份然後被所有的任務處理),使用這種分組方式要特別小心。

5. 全局分組(Global grouping):這種方式下所有的數據流都會被發送到 Bolt 的同一個任務中,也就是 id 最小的那個任務。

6. 非分組(None grouping):使用這種方式說明你不關心數據流如何分組。目前這種方式的結果與隨機分組完全等效,不過未來 Storm 社區可能會考慮通過非分組方式來讓 Bolt 和它所訂閱的 Spout 或 Bolt 在同一個線程中執行。

7. 直接分組(Direct grouping):這是一種特殊的分組方式。使用這種方式意味著元組的發送者可以指定下遊的哪個任務可以接收這個元組。只有在數據流被聲明為直接數據流時才能夠使用直接分組方式。使用直接數據流發送元組需要使用 OutputCollector 的其中一個 emitDirect 方法。Bolt 可以通過 TopologyContext 來獲取它的下遊消費者的任務 id,也可以通過跟蹤 OutputCollector 的 emit 方法(該方法會返回它所發送元組的目標任務的 id)的數據來獲取任務 id。

8. 本地或隨機分組(Local or shuffle grouping):如果在源組件的 worker 進程裏目標 Bolt 有一個或更多的任務線程,元組會被隨機分配到那些同進程的任務中。換句話說,這與隨機分組的方式具有相似的效果。

6 任務(Tasks)

在 Storm 集群中每個 Spout 和 Bolt 都由若幹個任務(tasks)來執行。每個任務都與一個執行線程相對應。數據流分組可以決定如何由一組任務向另一組任務發送元組。你可以在 TopologyBuilder 的 setSpout 方法和 setBolt 方法中設置 Spout/Bolt 的並行度。

7 工作進程(Workers)

拓撲是在一個或多個工作進程(worker processes)中運行的。每個工作進程都是一個實際的 JVM 進程,並且執行拓撲的一個子集。例如,如果拓撲的並行度定義為300,工作進程數定義為50,那麽每個工作進程就會執行6個任務(進程內部的線程)。Storm 會在所有的 worker 中分散任務,以便實現集群的負載均衡。

流式處理框架storm淺析