1. 程式人生 > >Storm學習(一) Storm介紹

Storm學習(一) Storm介紹

  • 場景

伴隨著資訊科技日新月異的發展,資訊呈現出爆發式的膨脹,人們獲取資訊的途徑也更加多樣、更加便捷,同時對於資訊的時效性要求也越來越高。舉個搜尋場景中的例子,當一個賣家釋出了一條寶貝資訊時,他希望的當然是這個寶貝馬上就可以被賣家搜尋出來、點選、購買啦,相反,如果這個寶貝要等到第二天或者更久才可以被搜出來,估計這個大哥就要罵娘了。再舉一個推薦的例子,如果使用者昨天在淘寶上買了一雙襪子,今天想買一副泳鏡去游泳,但是卻發現系統在不遺餘力地給他推薦襪子、鞋子,根本對他今天尋找泳鏡的行為視而不見,估計這哥們心裡就會想推薦你妹呀。其實稍微瞭解點背景知識的碼農們都知道,這是因為後臺系統做的是每天一次的全量處理,而且大多是在夜深人靜之時做的,那麼你今天白天做的事情當然要明天才能反映出來啦。

  • 實現一個實時計算系統

全量資料處理使用的大多是鼎鼎大名的hadoop或者hive,作為一個批處理系統,hadoop以其吞吐量大、自動容錯等優點,在海量資料處理上得到了廣泛的使用。但是,hadoop不擅長實時計算,因為它天然就是為批處理而生的,這也是業界一致的共識。否則最近這兩年也不會有s4,storm,puma這些實時計算系統如雨後春筍般冒出來啦。先拋開s4,storm,puma這些系統不談,我們首先來看一下,如果讓我們自己設計一個實時計算系統,我們要解決哪些問題。

  1. 低延遲。都說了是實時計算系統了,延遲是一定要低的。
  2. 高效能。效能不高就是浪費機器,浪費機器是要受批評的哦。
  3. 分散式。系統都是為應用場景而生的,如果你的應用場景、你的資料和計算單機就能搞定,那麼不用考慮這些複雜的問題了。我們所說的是單機搞不定的情況。
  4. 可擴充套件。伴隨著業務的發展,我們的資料量、計算量可能會越來越大,所以希望這個系統是可擴充套件的。
  5. 容錯。這是分散式系統中通用問題。一個節點掛了不能影響我的應用。

好,如果僅僅需要解決這5個問題,可能會有無數種方案,而且各有千秋,隨便舉一種方案,使用訊息佇列+分佈在各個機器上的工作程序就ok啦。我們再繼續往下看。

  1. 容易在上面開發應用程式。親,你設計的系統需要應用程式開發人員考慮各個處理元件的分佈、訊息的傳遞嗎?如果是,那有點麻煩啊,開發人員可能會用不好,也不會想去用。
  2. 訊息不丟失。使用者釋出的一個寶貝訊息不能在實時處理的時候給丟了,對吧?更嚴格一點,如果是一個精確資料統計的應用,那麼它處理的訊息要不多不少才行。這個要求有點高哦。
  3. 訊息嚴格有序。有些訊息之間是有強相關性的,比如同一個寶貝的更新和刪除操作訊息,如果處理時搞亂順序完全是不一樣的效果了。

不知道大家對這些問題是否都有了自己的答案,下面讓我們帶著這些問題,一起來看一看storm的解決方案吧。

  • Storm是什麼

如果只用一句話來描述storm的話,可能會是這樣:分散式實時計算系統。按照storm作者的說法,storm對於實時計算的意義類似於hadoop對於批處理的意義。我們都知道,根據google mapreduce來實現的hadoop為我們提供了map, reduce原語,使我們的批處理程式變得非常地簡單和優美。同樣,storm也為實時計算提供了一些簡單優美的原語。我們會在第三節中詳細介紹。

我們來看一下storm的適用場景。

  1. 流資料處理。Storm可以用來處理源源不斷流進來的訊息,處理之後將結果寫入到某個儲存中去。
  2. 分散式rpc。由於storm的處理元件是分散式的,而且處理延遲極低,所以可以作為一個通用的分散式rpc框架來使用。當然,其實我們的搜尋引擎本身也是一個分散式rpc系統。

說了半天,好像都是很玄乎的東西,下面我們開始具體講解storm的基本概念和它內部的一些實現原理吧。

  • Storm的基本概念

首先我們通過一個 storm 和hadoop的對比來了解storm中的基本概念。

Hadoop Storm
系統角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
應用名稱 Job Topology
元件介面 Mapper/Reducer Spout/Bolt

表3-1

接下來我們再來具體看一下這些概念。

  1. Nimbus:負責資源分配和任務排程。
  2. Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。
  3. Worker:執行具體處理元件邏輯的程序。
  4. Task:worker中每一個spout/bolt的執行緒稱為一個task. 在storm0.8之後,task不再與物理執行緒對應,同一個spout/bolt的task可能會共享一個物理執行緒,該執行緒稱為executor。

下面這個圖描述了以上幾個角色之間的關係。
系統結構
圖3-1

  1. Topology:storm中執行的一個實時應用程式,因為各個元件間的訊息流動形成邏輯上的一個拓撲結構。
  2. Spout:在一個topology中產生源資料流的元件。通常情況下spout會從外部資料來源中讀取資料,然後轉換為topology內部的源資料。Spout是一個主動的角色,其介面中有個nextTuple()函式,storm框架會不停地呼叫此函式,使用者只要在其中生成源資料即可。
  3. Bolt:在一個topology中接受資料然後執行處理的元件。Bolt可以執行過濾、函式操作、合併、寫資料庫等任何操作。Bolt是一個被動的角色,其介面中有個execute(Tuple input)函式,在接受到訊息後會呼叫此函式,使用者可以在其中執行自己想要的操作。
  4. Tuple:一次訊息傳遞的基本單元。本來應該是一個key-value的map,但是由於各個元件間傳遞的tuple的欄位名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
  5. Stream:源源不斷傳遞的tuple就組成了stream。

10.  stream grouping:即訊息的partition方法。Storm中提供若干種實用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等

相比於s4, puma等其他實時計算系統,storm最大的亮點在於其記錄級容錯和能夠保證訊息精確處理的事務功能。下面就重點來看一下這兩個亮點的實現原理。

  • Storm記錄級容錯的基本原理

首先來看一下什麼叫做記錄級容錯?storm允許使用者在spout中發射一個新的源tuple時為其指定一個message id, 這個message id可以是任意的object物件。多個源tuple可以共用一個message id,表示這多個源 tuple對使用者來說是同一個訊息單元。storm中記錄級容錯的意思是說,storm會告知使用者每一個訊息單元是否在指定時間內被完全處理了。那什麼叫做完全處理呢,就是該message id繫結的源tuple及由該源tuple後續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。在圖4-1中,在spout由message 1繫結的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。
訊息傳遞
圖4-1

在storm的topology中有一個系統級元件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id繫結的若干tuple的處理路徑,如果在使用者設定的最大超時時間內這些tuple沒有被完全處理,那麼acker就會告知spout該訊息處理失敗了,相反則會告知spout該訊息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,如果曾經嘗試過這麼做的同學可以仔細地思考一下這件事的複雜程度。但是storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來複習一個數學定理。

A xor A = 0.

A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。

storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會為使用者指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及後續的bolt作為該訊息單元的唯一標識。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之後,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之後,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker只需要對這些id做一個簡單的異或運算,就能判斷出該root id對應的訊息單元是否處理完成了。下面通過一個圖示來說明這個過程。

圖4-1 spout中繫結message 1生成了兩個源tuple,id分別是0010和1011.

圖4-2 bolt1處理tuple 0010時生成了一個新的tuple,id為0110.

圖4-3 bolt2處理tuple 1011時生成了一個新的tuple,id為0111.

圖4-4 bolt3中接收到tuple 0110和tuple 0111,沒有生成新的tuple.

可能有些細心的同學會發現,容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id並不是完全各異的,acker可能會在訊息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的確是存在的,但是在實際中其概率是極低極低的,完全可以忽略。

  • Storm的事務拓撲

事務拓撲(transactional topology)是storm0.7引入的特性,在最近釋出的0.8版本中已經被封裝為Trident,提供了更加便利和直觀的介面。因為篇幅所限,在此對事務拓撲做一個簡單的介紹。

事務拓撲的目的是為了滿足對訊息處理有著極其嚴格要求的場景,例如實時計算某個使用者的成交筆數,要求結果完全精確,不能多也不能少。Storm的事務拓撲是完全基於它底層的spout/bolt/acker原語實現的,通過一層巧妙的封裝得出一個優雅的實現。個人覺得這也是storm最大的魅力之一。

事務拓撲簡單來說就是將訊息分為一個個的批(batch),同一批內的訊息以及批與批之間的訊息可以並行處理,另一方面,使用者可以設定某些bolt為committer,storm可以保證committer的finishBatch()操作是按嚴格不降序的順序執行的。使用者可以利用這個特性通過簡單的程式設計技巧實現訊息處理的精確。

  • Storm在淘寶

由於storm的核心是clojure編寫的(不過大部分的拓展工作都是java編寫的),為我們理解它的實現帶來了一定的困難,好在大部分情況下storm都比較穩定,當然我們也在盡力熟悉clojure的世界。我們在使用storm時通常都是選擇java語言開發應用程式。

在淘寶,storm被廣泛用來進行實時日誌處理,出現在實時統計、實時風控、實時推薦等場景中。一般來說,我們從類kafka的metaQ或者基於hbase的timetunnel中讀取實時日誌訊息,經過一系列處理,最終將處理結果寫入到一個分散式儲存中,提供給應用程式訪問。我們每天的實時訊息量從幾百萬到幾十億不等,資料總量達到TB級。對於我們來說,storm往往會配合分散式儲存服務一起使用。在我們正在進行的個性化搜尋實時分析專案中,就使用了timetunnel + hbase + storm + ups的架構,每天處理幾十億的使用者日誌資訊,從使用者行為發生到完成分析延遲在秒級。

  • Storm的未來

Storm0.7系列的版本已經在各大公司得到了廣泛使用,最近釋出的0.8版本中引入了State,使得其從一個純計算框架演變成了一個包含儲存和計算的實時計算新利器,還有剛才提到的Trident,提供更加友好的介面,同時可定製scheduler的特性也為其針對不同的應用場景做優化提供了更便利的手段,也有人已經在基於storm的實時ql(query language)上邁出了指令碼。在服務化方面,storm一直在朝著融入mesos框架的方向努力。同時,storm也在實現細節上不斷地優化,使用很多優秀的開源產品,包括kryo, Disruptor, curator等等。可以想象,當storm發展到1.0版本時,一定是一款無比傑出的產品,讓我們拭目以待,當然,最好還是參與到其中去吧,同學們。