1. 程式人生 > >從Storm到Flink:大資料處理的開源系統及程式設計模型(文末福利)

從Storm到Flink:大資料處理的開源系統及程式設計模型(文末福利)

640?wx_fmt=jpeg

本文節選自CCF大資料教材系列叢書之《大資料處理》,本書由華中科技大學金海教授主編,包括大資料處理基礎技術、大資料處理程式設計與典型應用處理、大資料處理系統與優化三個方面。本教材以大資料處理程式設計為核心,從基礎、程式設計到優化等多個方面對大資料處理技術進行系統介紹,使得讀者能夠快速入門,同時體會大資料處理系統的設計理念與優化方法本質。


開源系統及程式設計模型


基於流計算的基本模型,當前已有各式各樣的分散式流處理系統被開發出來。本節將對當前開源分散式流處理系統中三個最典型的代表性的系統:Apache Storm,Spark Streaming,Apache Flink以及它們的程式設計模型進行詳細介紹。


 Apache Storm


Apache Storm是由Twitter公司開源的一個實時分散式流處理系統[2],被廣泛應用在實時分析、線上機器學習連續計算、分散式RPC、ETL等場景。Storm支援水平擴充套件、具有高容錯性,保證資料能被處理,而且處理速度很快。Storm支援多種程式語言,易於部署和管理,是目前廣泛使用的流處理系統之一。


一、Storm中的資料封裝


Storm系統可以從分散式檔案系統(如HDFS)或分散式訊息佇列(如Kafka)中獲取源資料,並將每個流資料元組封裝稱為tuple。一條資料流即是一個無邊界的tuple序列,而這些tuple序列可以以分散式的方式建立和處理。在Storm中,資料流中的每個tuple相互獨立,彼此間的處理上不存在任何關聯。Tuple也是Storm中訊息傳遞的基本單元,其資料結構如圖5-3-1所示。


640?wx_fmt=png


如圖5-3-1所 示, 一 個tuple可以包含多個欄位(field),每個欄位代表對應流資料的一個屬性,在Storm的每個操作元件傳送向下遊傳送tuple時,會宣告對應tuple每個欄位的順序和代表的含義(如資料的鍵、值、時間戳等)。


二、Storm中的應用拓撲建立


在Storm中, 用 戶 所 提 交 的 應 用 所 構 建 的DAG拓撲被稱為Topology。Storm的Topology類似於MapReduce中的一個job,但區別在於這個拓撲會永遠執行(或者直到手動結束)。每個Topology中有兩個重要元件:spout和bolt。


spout是Topology中資料流的來源,也即對應DAG模型中的起始

操作。spout可以從外部源讀取資料並將其以封裝成tuple的形式傳送到圖 5-3-1 tuple的資料結構Topology中。bolt是Topology中對tuple進行處理的主要單元。Storm並不區分中間和終止操作,而是將其統一為bolt來進行實現,也即對結果的輸出需要由使用者自己來實現。所有對流資料的處理都是在bolt中實現,bolt可以執行各種基礎操作,如過濾、聚合、連線等。bolt每處理完一個tuple後,可以按照應用需求傳送給0個或多個tuple給下游的bolt。


三、Storm中的並行度指定


Storm中的並行度有三層含義。首先是worker程序數。Storm可以建立在分散式叢集上,每臺物理節點可以發起一個或多個worker程序。


一個worker對應一個物理的JVM(Java虛擬機器)。通常,整個Topology會由一個或者多個worker程序來負責執行。每個worker會在一個JVM中執行一個或多個executor,每個executor對應一個執行緒,執行某一個spout或者bolt的計算任務。在Storm中,每個spout/bolt都可以例項化生成多個task在叢集中執行,一般預設情況下,executor數與task數一一對應,也即每個例項都由一個單獨的執行緒來執行。使用者也可以指定task數大於executor數,這時部分task會由同一個執行緒迴圈呼叫來執行。在Storm的Topology建立時,使用者可以根據需要依次來設定整體的worker程序數以及每個spout/bolt對應的executor數和task數。


四、Storm中的資料分組和傳輸


使用者可以通過定義分組策略(streaming grouping)來決定資料流如何在不同的spout/bolt的task中進行分發和傳輸。分組策略將所有的spout和bolt連線起來構成一個Topology,如圖5-3-2所示。除了5.2.4節所介紹的幾種基本分組策略外,Storm還支援其他的分組策略。例如local grouping,這是shuffle grouping的 一 種 變 種 分 組 策 略。由於Storm劃分多個worker程序,shuffle grouping可能導致大量的程序間通訊,local grouping則是將元組優先發往與自己同進程的下游task中,若沒有這種下游task,才繼續沿用shuffle grouping的方式。


640?wx_fmt=png

圖 5-3-2 streaming grouping


640?wx_fmt=png

圖 5-3-3 Storm系統架構


又如direct grouping,這是一種特殊的分組方式,使用者可以直接指定由下游的哪一個task來接收資料。


五、Storm的分散式系統架構


Storm可以執行在分散式叢集上。Storm叢集結構沿用了主從架構方式,即一個主控節點和多個工作節點。圖5-3-3展示了整個Storm的系統架構。


Storm的基本元件分別如下。


Nimbus:執行Nimbus的節點是Storm叢集的主控節點,Nimbus類似Hadoop中JobTracker的角色,是使用者和Storm系統之間的互動點。Nimbus主要的工作是用於使用者提交Topology、進行叢集任務的分配排程、進行叢集監控和統計等。


Supervisor:每個工作節點上都執行著一個Supervisor,Supervisor用於接收Nimbus分配的任務,並根據分配產生worker程序。同時它還會監視worker的健康狀況,在必要的情況下會重啟worker程序。


ZooKeeper:Storm系統借用Zookeeper叢集來進行Nimbus和Supervisor之間的所有協調工作,包括Nimbus對Supervisor所執行的任務的調配,以及幫助Nimbus監控所有Supervisor和task的執行情況,以便失效時迅速重啟。


六、Storm的程式設計示例


下面,以一個簡單的WordCount應用為例,來進行Storm程式設計模型的講解。


(1)實現生成資料的spout,封裝資料首先構建一個CreateSentenceSpout來進行資料流的生成。為了簡化說明,從若干給定的靜態句子列表中每次隨機抽取一句作為一個tuple來傳遞給下游bolt進行處理。CreateSentenceSpout的具體實現如程式碼5-3-1所示。


640?wx_fmt=png


以上程式碼中,BaseRichSpout類是Storm提供的一個簡單介面,使用它可以預設實現很多方法,使使用者只用關心實現應用所需要的程式碼上去。


在spout中最主要的工作就是資料的封裝。Spout的核心程式碼在nextTuple( )方法中實現,即如何產生所需的tuple並進行傳輸。Spout會迴圈呼叫此方法來不斷產生新的tuple。在本例中,從open( )方法裡給定的句子列表中隨機抽取一條作為tuple,並通過emit方法將tuple進行傳輸。


在emit生成tuple時,還需要對tuple中的每個欄位進行宣告。這是由declareOutputFields( )方法來實現。這個方法是Storm中所有spout/bolt都需要實現的一個方法。在本例中,生成的每個句子對應一個tuple,其只具有一個欄位,欄位的值就是句子本身,因此在declareOutputFields( )中宣告欄位只有一個“sentence”。open( )方法是對應元件在進行初始化時執行的方法,其中要注意的是open( )方法會接收SpoutOutputCollector物件所提供的後續tuple傳輸方法作為引數,因此在open( )方法的實現中,需要將其引用儲存在一個變數當中,以便nextTuple( )方法呼叫。


(2)實現對流資料進行操作處理的bolt


在WordCount應用中,對spout生成的句子,構建兩個bolt來進行處理:一個SplitWordBolt來將句子劃分為單詞,一個CountBolt來對劃分好的單詞進行累計計數。下面,以SplitWordBolt為例來進行講解,其實現程式碼如程式碼5-3-2所示。


640?wx_fmt=png


BasicRichBolt類同樣是Storm對於bolt類提供的一個簡單介面,使使用者能夠僅集中編寫所需的操作邏輯即可。


Bolt的核心是execute( )方法。每當接收到一個新的tuple,都會直接對此方法進行呼叫,然後執行。使用getStringByField( )方法可以讀取在上游元件生成tuple時宣告的對應欄位裡的值。當完成處理後,如果新產生的tuple需要繼續向後傳輸,可以通過呼叫emit方法對tuple進行傳送。


prepare( )方法與spout中 的open( )方 法 功 能 相 似。Declare-OutputFields( )方法與spout( )中相同,這裡不再贅述。


(3)構建流應用Topology,並指明並行度和分組策略


實現了對應的spout和bolt功能之後,最後就是將其連線成一個完整的Topology。本例中Topology的程式碼如程式碼5-3-3所示。


640?wx_fmt=png


以上程式碼中,首先生成了TopologyBuilder的一個例項,然後分別對應生成spout和bolt的各個例項。在setSpout和setBolt方法中,第一個引數為對應的元件註冊了ID,第二個引數生成對應元件的例項,而第三個引數為對應元件需要生成的executor個數。


在setBolt方法中,除了對應生成例項外,還需要指定每個bolt需要接收哪個元件傳送給自己的資料,以及資料的傳送方式,即分組策略。例如本例中,CountBolt需要從SplitWordBolt處接收資料,SplitWordBolt傳送的資料以fields grouping( 同key grouping) 的方式進行傳送,其中用於分組的鍵值為SplitWordBolt傳送tuple的“word”欄位的值。


最後,可以自由指定程式的並行度。可以使用setNumWorkers方法來指定用於執行此Topology中worker程序的個數,本例中為整個Topology分配了4個worker程序;可以用setSpout和setBolt方法中的第三個引數指定executor數量,而若需要指定更多的task數,則可以繼續使用setNumTasks進行設定。本例為每個spout/bolt都生成了4個executor,而進一步為SplitWordBolt分配了8個task,這使得每2個task由一個executor執行緒來負責執行。


Spark Streaming


Spark Streaming是Spark API核心擴充套件,提供對實時資料流進行流式處理,具備可擴充套件、高吞吐和容錯等特性。Spark Streaming支援從多種資料來源中提取資料,例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,並提供了一些高階的API來表示複雜處理演算法,如map、reduce、join、windows等,最後可以將得到的結果儲存到分散式檔案系統(如HDFS)、資料庫或者其他輸出,Spark的機器學習和圖計算的演算法也可以應用於Spark Streaming的資料流中。


一、Spark Streaming中的資料封裝


和Storm不同的是,Spark Streaming本質上是一個典型的微批處理系統,其與以元組為單位進行流式處理不同,它將無盡的資料流按時間切分為連續的小批次資料,然後以傳統的批處理方法來進行快速連續的處理。


在Spark Streaming中,資料流被抽象成以時間片段分隔開的離散流(discretized stream)形式。簡單而言,就是將所有的流資料按照一定的批大小(如1秒)分割成一段又一段的小批次資料,如圖5-3-4所示。Spark Streaming使用Spark引擎,將每一段小批次資料轉化成為Spark當中的RDD(彈性分散式資料集)。流資料即以RDD的形式在Spark Streaming系統中進行運算。


640?wx_fmt=png

圖 5-3-4 Spark Streaming的離散流


二、Spark Streaming中的應用拓撲建立


Spark Streaming同樣在系統中構建出DAG的處理模型。不過與Storm不同,Spark Streaming並不使用固定的處理單元來執行單一的操作。實際上,Spark Streaming中的DAG與Spark Core中的DAG相同,只是用DAG的形式將每一個時間分片對應的RDD進行運算的job來進一步劃分成任務集stage,以便進行高效的批處理。Spark Streaming沿用了Spark Core對RDD提供的transformation操作,將所有RDD依次進行轉換,應用邏輯分別進行轉換處理,進而實現對整個離散流的轉換。


圖5-3-5展示了Spark Streaming的整體計算框架,一方面線上輸入的資料流被按照時間切分為若干小批次資料並被轉化成為RDD儲存在記憶體中,另一方面,根據流應用邏輯,也即流處理引用抽象出DAG拓撲,制定出相應的RDD transformation。RDD不斷被批量執行transformation操作,直到產生最終的結果。


640?wx_fmt=png

圖 5-3-5 Spark Streaming 計算框架[7]


三、Spark Streaming中的並行度指定


由於Spark Streaming本質上是將資料流的任務劃分成為大量的微批資料,對應多個job來執行,所以Spark Streaming的並行度設定與Spark進行批處理時的設定一樣,只能設定整體job的並行度,而不能對每個操作單獨的並行度進行設定。然而由於批處理的特性,Spark Streaming可以最大化對系統並行能力的利用,也能獲得相對更高的系統吞吐率。


四、Spark Streaming中的資料分組和傳輸


由於使用微批處理技術,Spark Streaming的資料被打包為一個個微批,而每個微批相互獨立地進行處理,所以不涉及所提到的資料分組與傳輸問題。但這也展現出微批處理的一個侷限性,其難以靈活處理基於使用者自定義的視窗的聚合、計數等操作,也不能進行鍼對資料流的連續計算,如兩個資料流的實時連線等操作。


五、Spark Streaming的系統框架


Spark Streaming建立在Spark系統之上,其系統架構相對於Spark的修改和新增部分如圖5-3-6所示。


640?wx_fmt=png

圖 5-3-6 Spark Streaming 基於Spark修改和新增的元件[7]


除開Spark系統本身元件外,Spark Streaming主要元件如下。


master:是Spark Streaming中流應用的入口。根據應用邏輯產生用於轉換RDD的task然後進行排程,並對這些task進行追蹤。D-Stream lineage包含了離散流間的轉換關係,類似流應用的DAG圖。


client:Spark Streaming建立了一個client庫來將資料傳入到系統當中。


worker:是Spark Streaming中流資料的入口以及執行RDD轉換的主要元件。相對於Spark,主要新增了input receiver對流資料進行獨立的接收。流資料可以是從系統外線上地進行讀取進來,並轉化為離散流的形式,也可以是經過其他execution執行轉化後的離散流。


六、Spark Streaming的程式設計示例


Spark Streaming的程式設計較為簡單,這是由於它本身基於Spark建立,有豐富的API可以呼叫,可以省去大量無關的編碼。下面同樣以WordCount應用為例來對Spark Streaming的程式設計模型進行說明。


(1)離散流的輸入和資料封裝


在WordCount應用中,假定直接從一個socket來獲取源源不斷的句子資料流,那麼資料流的輸入具體實現如程式碼5-3-4所示。


640?wx_fmt=png


以上程式碼中,首先建立了JavaStreamingContext物件,同時需要指定劃分離散流的時間間隔。本例中指定了每隔1s就劃分一次微批。接著,指定從埠8888的socket中持續獲取資料流。通過以上程式碼,每個executor獲取的資料流就會根據1s的時間間隔不斷劃分成小批次,並進一步轉化為RDD。這一串RDD的組合即是新產生的“lines”離散流。


(2)建立應用拓撲,進行離散流的轉化


離散流的轉化即根據相應的應用邏輯指定對應的RDD的轉化方式。在WordCount應用中,先將句子轉化為若干的單詞,然後將每個單詞變成(單詞,計數)的二元對,最後對相同單詞的二元對計數進行累加。具體實現如程式碼5-3-5所示。


640?wx_fmt=png

640?wx_fmt=png


以上程式碼中,利用Spark豐富的transformation方法,將由一個個句子組成的“lines”離散流首先通過flatMap的方式對映為由單片語成的“words”離散流。進一步通過mapToPair的方式對映為(單詞,計數)二元對組成的“pairs”離散流,這裡每個單詞沒有累加前,計數值就直接等於1。最後通過reduceByKey的方式,對相同單詞的計數進行累加操作。


 Apache Flink


Apache Flink是一個同時支援分散式資料流處理和資料批處理的大資料處理系統。其特點是完全以流處理的角度出發進行設計,而將批處理看作是有邊界的流處理特殊流處理來執行。Flink可以表達和執行許多類別的資料處理應用程式,包括實時資料分析、連續資料管道、歷史資料處理(批處理)和迭代演算法(機器學習、圖表分析等)。


Flink同樣是使用單純流處理方法的典型系統,其計算框架與原理和Apache Storm比較相似。Flink做了許多上層的優化,也提供了豐富的API供開發者能更輕鬆地完成程式設計工作。


一、Flink中的資料封裝


Flink能夠支撐對多種型別的資料進行處理,例如Flink支撐任意的Java或者Scala型別,這使得Flink使用更加靈活。類似Storm,Flink同樣也可以使用多欄位的tuple為其基本資料單元。Flink可以支援了多種Flink tuple型別(tuple1至tuple25),每種tuple都是一個固定長度的物件序列。


二、Flink中的應用拓撲建立


Flink中核心概念為資料流(stream)和轉換(transformation)。每個轉換對應的是一個簡單的操作,根據應用邏輯,轉換按先後順序構成了流應用的DAG圖,如圖5-3-7所示。資料流在轉換之間傳遞,直到完成所有的轉換進行輸出。Flink應用包含明確的源操作和匯聚操作,用於資料的輸入與輸出。


640?wx_fmt=png


Flink內 部 實 現 了 許 多 基 本 的 轉 換 操 作, 比 如Map、FlatMap、Reduce、Window等, 同 時 也 實 現 了 許 多 源 和 匯 聚 操 作, 比 如writeAsText、writeAsCsv、print等。Flink提供了豐富的API以簡化用戶對應用拓撲的編寫和表達。


三、Flink中的並行度指定


與Storm相似,Flink程式的計算框架本質上也並行分散式的。在系統中,一個流包含一個或多個流分割槽,而每一個轉換操作包含一個或多個子任務例項。操作的子任務間彼此獨立,以不同的執行緒執行,可以執行在不同的機器或容器上。


一個Flink應用同樣執行在一個或多個worker程序當中。一個worker中生成一個或多個task slot。每個task slot用以承載和執行Flink每個轉換操作的一個子任務例項。Flink可以指定全域性的task slot數目作為其最大的並行度。同時若部分轉換不需要使用如此多資源,Flink也可以指定每一操作具體的子任務數。每個轉換操作對應的子任務預設輪詢地分佈在分配的task slot內。


四、Flink中的資料分組與傳輸


Flink的資料分組方法主要包括一對一(one-to-one)模式或者重分組(redistributing)模式兩種。


採用一對一模式時,資料流中元素的分組和順序會保持不變,也就是說,對於上下游的兩個不同的轉換操作,下游任一子任務內要處理的元組資料,與上游相同順序的子任務所處理的元組資料完全一致。


採用重分組模式則會改變資料流所在的分組。重分組後元組的目標子任務根據處理的變換方法不同而發生改變。例如經過keyBy( )轉化,元組就會根據keyBy( )的引數選擇對應的欄位作為key值,進行雜湊計算來重新分組。經過broadcast( )轉化即相應地進行廣播等。


五、Flink的系統框架


圖5-3-8顯示了Apache Flink的分散式執行環境架構。


Flink的系統架構中包含以下重要元件。


jobclinet:jobclient是一個獨立的程式執行入口。job client負責接收使用者提交的程式,並將使用者提交的程式通過優化器和graph builder轉換成dataflow graph(類似流應用的DAG圖)。然後將生成的data flow提交給job manager進行job的管理和排程。一旦執行完成,job client返回給使用者最後的執行結果。


jobmanager:對應一個Flink程式的master程序,負責job的管理和資源的協調。主要包括任務排程、監控任務的執行狀態、協調任務的執行、檢查點管理和失敗恢復等。


640?wx_fmt=png

圖 5-3-8 Apache Flink分 布 式執行環境[9]


taskmanager和taskslot:是Flink中具體負責執行tasks的元件。每個taskmanage對應是執行在節點上的JVM程序,擁有一定的量的資源。比如記憶體、CPU、網路、磁碟等。每個執行的task執行在其中的一個或多個執行緒中。taskslot是分散式程式真正執行task的地方。每個taskslot可以包括JVM程序中的一部分記憶體。


六、Flink的程式設計示例


Flink的程式設計核心也就在 數 據 流 和 轉 換 上。 下 面, 依 然 以WordCount為例來對Flink的程式設計模型進行說明。程式碼5-3-6是Flink中以5分鐘為視窗進行一次求和統計的WordCount應用程式碼。


640?wx_fmt=png


在以上程式碼中,定義了一個DataStream例項,並通過socket的方式從8888埠監聽線上獲取資料。監聽到的句子資料被使用flatmap轉化成單詞,並直接以(單詞,計數)二元對的形式記錄下來。當流被轉化為二元對後,接著根據當前第0位的欄位“word”進行keyBy( )的操作,最後以5分鐘為視窗大小,對計數值進行累計。


Flink的程式設計非常簡潔和直觀,上例中,DataStream從源操作從socket線上讀取資料,到各種轉換操作,到最後的匯聚求和操作都可以直接表達出來。Flink提供了豐富的API和各種表達上的簡化來降低使用者的程式設計難度和程式設計量。


上例通過使用env.setParallelism來設定流處理程式的整體並行度,即taskslot數量為8。同時,可以進一步為每一個操作設定並行度,如在saveAsText( )操作後通過使用setParallelism將這個操作的並行度修改為1。


2018 中國大資料技術大會

BDTC 2018


BDTC 2018中國大資料技術大會攜主題“大資料新應用”再度強勢來襲。現在購票即有可能獲得華中科技大學教授金海主編的《大資料處理》一書,數量有限哦~掃描下方二維碼或點選【閱讀原】快速購票。


640?wx_fmt=jpeg


推薦閱讀

專訪百度熊輝:有人轉AI純粹因為好找工作,這樣的人不是我想要的

2018最後一戰:25天程式設計PK賽!

牛!程式設計師女友4招拯救了我髮際線!

AWS Lambda重大更新,跨越程式語言差異之門?

程式設計師婚戀現狀大調查:有人三十歲沒談過戀愛,有人丁克萬歲

這個女生說:弄懂本文前,你所知道的區塊鏈可能都是錯的

GitHub 的“封神”之路!