分散式流式計算框架 Storm
- 場景
伴隨著資訊科技日新月異的發展,資訊呈現出爆發式的膨脹,人們獲取資訊的途徑也更加多樣、更加便捷,同時對於資訊的時效性要求也越來越高。舉個搜尋場景中的例子,當一個賣家釋出了一條寶貝資訊時,他希望的當然是這個寶貝馬上就可以被賣家搜尋出來、點選、購買啦,相反,如果這個寶貝要等到第二天或者更久才可以被搜出來,估計這個大哥就要罵娘了。再舉一個推薦的例子,如果使用者昨天在淘寶上買了一雙襪子,今天想買一副泳鏡去游泳,但是卻發現系統在不遺餘力地給他推薦襪子、鞋子,根本對他今天尋找泳鏡的行為視而不見,估計這哥們心裡就會想推薦你妹呀。其實稍微瞭解點背景知識的碼農們都知道,這是因為後臺系統做的是每天一次的全量處理,而且大多是在夜深人靜之時做的,那麼你今天白天做的事情當然要明天才能反映出來啦。
- 實現一個實時計算系統
全量資料處理使用的大多是鼎鼎大名的Hadoop或者Hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量資料處理上得到了廣泛的使用。但是,hadoop不擅長實時計算,因為它天然就是為批處理而生的,這也是業界一致的共識。否則最近這兩年也不會有s4,storm,puma這些實時計算系統如雨後春筍般冒出來啦。先拋開s4,storm,puma這些系統不談,我們首先來看一下,如果讓我們自己設計一個實時計算系統,我們要解決哪些問題。
- 低延遲。都說了是實時計算系統了,延遲是一定要低的。
- 高效能。效能不高就是浪費機器,浪費機器是要受批評的哦。
-
分散式。系統都是為應用場景而生的,如果你的應用場景、你的資料和計算單機就能搞定,那麼不用考慮這些複雜的問題了。我們所說的是單機搞不定的情況。
- 可擴充套件。伴隨著業務的發展,我們的資料量、計算量可能會越來越大,所以希望這個系統是可擴充套件的。
- 容錯。這是分散式系統中通用問題。一個節點掛了不能影響我的應用。
好,如果僅僅需要解決這5個問題,可能會有無數種方案,而且各有千秋,隨便舉一種方案,使用訊息佇列+分佈在各個機器上的工作程序就ok啦。我們再繼續往下看。
- 容易在上面開發應用程式。親,你設計的系統需要應用程式開發人員考慮各個處理元件的分佈、訊息的傳遞嗎?如果是,那有點麻煩啊,開發人員可能會用不好,也不會想去用。
-
訊息不丟失。使用者釋出的一個寶貝訊息不能在實時處理的時候給丟了,對吧?更嚴格一點,如果是一個精確資料統計的應用,那麼它處理的訊息要不多不少才行。這個要求有點高哦。
- 訊息嚴格有序。有些訊息之間是有強相關性的,比如同一個寶貝的更新和刪除操作訊息,如果處理時搞亂順序完全是不一樣的效果了。
不知道大家對這些問題是否都有了自己的答案,下面讓我們帶著這些問題,一起來看一看storm的解決方案吧。
- Storm是什麼
如果只用一句話來描述storm的話,可能會是這樣:分散式實時計算系統。按照storm作者的說法,storm對於實時計算的意義類似於hadoop對於批處理的意義。我們都知道,根據google mapreduce來實現的hadoop為我們提供了map, reduce原語,使我們的批處理程式變得非常地簡單和優美。同樣,storm也為實時計算提供了一些簡單優美的原語。我們會在第三節中詳細介紹。
我們來看一下storm的適用場景。
- 流資料處理。Storm可以用來處理源源不斷流進來的訊息,處理之後將結果寫入到某個儲存中去。
- 分散式rpc。由於storm的處理元件是分散式的,而且處理延遲極低,所以可以作為一個通用的分散式rpc框架來使用。當然,其實我們的搜尋引擎本身也是一個分散式rpc系統。
說了半天,好像都是很玄乎的東西,下面我們開始具體講解storm的基本概念和它內部的一些實現原理吧。
- Storm的基本概念
首先我們通過一個 storm 和hadoop的對比來了解storm中的基本概念。
Hadoop | Storm | |
系統角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
應用名稱 | Job | Topology |
元件介面 | Mapper/Reducer | Spout/Bolt |
表3-1
接下來我們再來具體看一下這些概念。
- Nimbus:負責資源分配和任務排程。
- Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。
- Worker:執行具體處理元件邏輯的程序。
- Task:worker中每一個spout/bolt的執行緒稱為一個task. 在storm0.8之後,task不再與物理執行緒對應,同一個spout/bolt的task可能會共享一個物理執行緒,該執行緒稱為executor。
- Topology:storm中執行的一個實時應用程式,因為各個元件間的訊息流動形成邏輯上的一個拓撲結構。
- Spout:在一個topology中產生源資料流的元件。通常情況下spout會從外部資料來源中讀取資料,然後轉換為topology內部的源資料。Spout是一個主動的角色,其介面中有個nextTuple()函式,storm框架會不停地呼叫此函式,使用者只要在其中生成源資料即可。
- Bolt:在一個topology中接受資料然後執行處理的元件。Bolt可以執行過濾、函式操作、合併、寫資料庫等任何操作。Bolt是一個被動的角色,其介面中有個execute(Tuple input)函式,在接受到訊息後會呼叫此函式,使用者可以在其中執行自己想要的操作。
- Tuple:一次訊息傳遞的基本單元。本來應該是一個key-value的map,但是由於各個元件間傳遞的tuple的欄位名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
- Stream:源源不斷傳遞的tuple就組成了stream。
10. stream grouping:即訊息的partition方法。Storm中提供若干種實用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等
相比於s4, puma等其他實時計算系統,storm最大的亮點在於其記錄級容錯和能夠保證訊息精確處理的事務功能。下面就重點來看一下這兩個亮點的實現原理。
- Storm記錄級容錯的基本原理
首先來看一下什麼叫做記錄級容錯?storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id, 這個message id可以是任意的object物件。多個源tuple可以共用一個message id,表示這多個源 tuple對使用者來說是同一個訊息單元。storm中記錄級容錯的意思是說,storm會告知使用者每一個訊息單元是否在指定時間內被完全處理了。那什麼叫做完全處理呢,就是該message id繫結的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message
1繫結的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。
圖4-1
在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id繫結的若干tuple的處理路徑,如果在使用者設定的最大超時時間內這些tuple沒有被完全處理,那麼acker就會告知spout該訊息處理失敗了,相反則會告知spout該訊息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這麼做的同學可以仔細地思考一下這件事的複雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來複習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。
storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會為使用者指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及後續的bolt作為該訊息單元的唯一標識。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之後,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker只需要對這些id做一個簡單的異或運算,就能判斷出該root
id對應的訊息單元是否處理完成了。下面通過一個圖示來說明這個過程。
圖4-1 spout中繫結message 1生成了兩個源tuple,id分別是0010和1011.
圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id為0110.
圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id為0111.
圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.
可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id並不是完全各異的,acker可能會在訊息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的確是存在的,但是在實際中其概率是極低極低的,完全可以忽略。
- Storm的事務拓撲
事務拓撲(transactional topology)是storm0.7引入的特性,在最近釋出的0.8版本中已經被封裝為Trident,提供了更加便利和直觀的介面。因為篇幅所限,在此對事務拓撲做一個簡單的介紹。
事務拓撲的目的是為了滿足對訊息處理有著極其嚴格要求的場景,例如實時計算某個使用者的成交筆數,要求結果完全精確,不能多也不能少。Storm的事務拓撲是完全基於它底層的spout/bolt/acker原語實現的,通過一層巧妙的封裝得出一個優雅的實現。個人覺得這也是storm最大的魅力之一。
事務拓撲簡單來說就是將訊息分為一個個的批(batch),同一批內的訊息以及批與批之間的訊息可以並行處理,另一方面,使用者可以設定某些bolt為committer,storm可以保證committer的finishBatch()操作是按嚴格不降序的順序執行的。使用者可以利用這個特性通過簡單的程式設計技巧實現訊息處理的精確。
- Storm在淘寶
由於storm的核心是clojure編寫的(不過大部分的拓展工作都是Java編寫的),為我們理解它的實現帶來了一定的困難,好在大部分情況下storm都比較穩定,當然我們也在盡力熟悉clojure的世界。我們在使用storm時通常都是選擇java語言開發應用程式。
在淘寶,storm被廣泛用來進行實時日誌處理,出現在實時統計、實時風控、實時推薦等場景中。一般來說,我們從類kafka的metaQ或者基於Hbase的timetunnel中讀取實時日誌訊息,經過一系列處理,最終將處理結果寫入到一個分散式儲存中,提供給應用程式訪問。我們每天的實時訊息量從幾百萬到幾十億不等,資料總量達到TB級。對於我們來說,storm往往會配合分散式儲存服務一起使用。在我們正在進行的個性化搜尋實時分析專案中,就使用了timetunnel + hbase + storm + ups的架構,每天處理幾十億的使用者日誌資訊,從使用者行為發生到完成分析延遲在秒級。
- Storm的未來
Storm0.7系列的版本已經在各大公司得到了廣泛使用,最近釋出的0.8版本中引入了State,使得其從一個純計算框架演變成了一個包含儲存和計算的實時計算新利器,還有剛才提到的Trident,提供更加友好的介面,同時可定製scheduler的特性也為其針對不同的應用場景做優化提供了更便利的手段,也有人已經在基於storm的實時ql(query language)上邁出了指令碼。在服務化方面,storm一直在朝著融入mesos框架的方向努力。同時,storm也在實現細節上不斷地優化,使用很多優秀的開源產品,包括kryo, Disruptor, curator等等。可以想象,當storm發展到1.0版本時,一定是一款無比傑出的產品,讓我們拭目以待,當然,最好還是參與到其中去吧,同學們。
- 參考文獻
相關推薦
分散式流式計算框架 Storm
場景 伴隨著資訊科技日新月異的發展,資訊呈現出爆發式的膨脹,人們獲取資訊的途徑也更加多樣、更加便捷,同時對於資訊的時效性要求也越來越高。舉個搜尋場景中的例子,當一個賣家釋出了一條寶貝資訊時,他希望的當然是這個寶貝馬上就可以被賣家搜尋出來、點選、購買啦,相反,如果這個寶貝要
storm 流式計算框架
大數據 storm 流式計算 一:storm 簡介 二:storm 的原理與架構 三:storm 的 安裝配置 四:storm 的啟動腳本 一: storm 的簡介: 1.1 storm 是什麽: 1. Storm是Twitter開源的分布式實時大數據處理框架,被業界稱為實時版Hadoo
流式處理框架storm淺析
min 完成 文檔 efault 生成 沒有 ado 編程 現在 前言前一段時間參與哨兵流式監控功能設計,調研了兩個可以做流式計算的框架:storm和spark streaming,我負責storm的調研工作。斷斷續續花了一周的時間看了官網上的doc和網絡上的一些資料。我把
Flink 流式計算框架(學習一)
開源流計算引擎,兼顧效能和可靠性。 Flink資料集型別 有邊資料集:最終不再發生改變 無邊資料集
【線上直播】Flink—新一代流式計算框架
分享講師:黃躍峰
流式計算--storm1(storm概念初識)
1.Storm是什麼? Storm用來實時處理資料,特點:低延遲、高可用、分散式、可擴充套件、資料不丟失。提供簡單容易理解的介面,便於開發。 2.Storm與Hadoop的區別? Storm用於實時計算,Hadoop用於離線計算。 Storm處理的資料儲存在
流式計算--storm3(Storm單詞技術案例)
功能說明:設計一個topology,來實現對文件裡面的單詞出現的頻率進行統計。本篇部落格是在storm概念講解和storm叢集搭建的基礎上來的 1.建立一個maven專案: 新增以來如下: <dependency>
流式計算框架調研
三大主流框架 對比 框架 特點 支援的語言 接入者 Apache Storm Clojure語言實現,訊息的最小單位為元祖,可增量計算,延遲最低(
流式計算框架之Strom基本概念
轉載於 http://www.cnblogs.com/xia520pi/p/4816507.html,謝謝 首先我們通過一個Storm和Hadoop的對比表格,來了解Storm中的基本概念。 接下來我們再來具體看一下這些概念。 Nimbus:負責資源分配和任務排
分散式流式計算平臺-S4
本文是作者在充分閱讀和理解Yahoo!最新發布的技術論文《S4:Distributed Stream Computing Platform》的基礎上,所做出的知識分享。 S4是Yahoo!在2010年10月開源的一套通用、分散式、可擴充套件、部分容錯、具備可插拔功能
流式計算之Storm簡介
Storm目前存在的問題 1. 目前的開源版本中只是單節點Nimbus,掛掉只能自動重啟,可以考慮實現一個雙nimbus的佈局。 2. Clojure是一個在JVM平臺執行的動態函數語言程式設計語言,優勢在於流程計算, Storm的部分核心內容由Clojure編寫,雖然效能上提高不少但同時也提升了維護成本
Storm簡介——實時流式計算介紹
大數據 bsp 要求 角度 size 計算 spa 流量 使用場景 概念 實時流式計算: 大數據環境下,流式數據將作為一種新型的數據類型,這種數據具有連續性、無限性和瞬時性。是實時數據處理所面向的數據類型,對這種流式數據的實時計算就是實時流式計算。 特
hadoop(十三)storm流式計算(實時處理)
storm介紹 說明+安裝文件 Storm是一個開源的分散式實時計算系統,可以簡單、可靠的處理大量的資料流。被稱作“實時的hadoop”。Storm有很多使用
大資料學習:storm流式計算
Storm是一個分散式的、高容錯的實時計算系統。Storm適用的場景: 1、Storm可以用來用來處理源源不斷的訊息,並將處理之後的結果儲存到持久化介質中。 2、由於Storm的處理元件都是分散式的,而且處理延遲都極低,所以可以Storm可以做為
流式計算--整合kafka+flume+storm
1.資料流向 日誌系統=>flume=>kafka=>storm 2.安裝flume 1.我們在storm01上安裝flume1.6.0,上傳安裝包 2.解壓到 /export/servers/flume,
【流式計算】Twitter Storm原始碼分析之Nimbus/Supervisor本地目錄結構
我們知道,storm叢集裡面工作機器分為兩種一種是nimbus, 一種是supervisor, 他們通過zookeeper來進行互動,nimbus通過zookeeper來發布一些指令,supervisor去讀zookeeper來執行這些指令,具體nimbus和supe
流式計算storm應用場景簡介
(1) storm是一個程序常駐記憶體的、分散式的,對資料實時流式處理框架,不同於MR的批處理和spark streaming的微批處理,storm實現了對資料處理的毫秒級延遲。它的資料來源被稱為SPOUT,資料處理流程被稱為BOLT (2)storm一般應用於對資料的處
Storm:流式處理框架之特性與應用場景
資料時代的今夕,如秋風席捲落葉漫天紛飛,storm願做繫鈴人解之庖丁。 一、是什麼 Storm是一個分散式的資料流處理系統。它會把工作任務委託給不同型別的元件,每個
Storm流式計算入門
流式計算 實時獲取資料,實時資料儲存,實時資料計算,實時結果快取,持久化儲存(mysql) 代表技術: Flume:實時獲取資料 Kafka:實時資料儲存 Storm/jstorm:實時資料計算 Redis:實時結果快取 總結:將源源不斷產生的資料
【流式計算】Twitter Storm原始碼分析之ZooKeeper中的目錄結構
作者: xumingming | 可以轉載, 但必須以超連結形式標明文章原始出處和作者資訊及版權宣告 我們知道Twitter Storm的所有的狀態資訊都是儲存在Zookeeper裡面,nimbus通過在zookeeper上面寫狀態資訊來分配任務,supervisor