從Storm到Flink:大數據處理的開源系統及編程模型
如圖5-3-1所 示, 一 個tuple可以包含多個字段(field),每個字段代表對應流數據的一個屬性,在Storm的每個操作組件發送向下遊發送tuple時,會聲明對應tuple每個字段的順序和代表的含義(如數據的鍵、值、時間戳等)。
二、Storm中的應用拓撲建立在Storm中, 用 戶 所 提 交 的 應 用 所 構 建 的DAG拓撲被稱為Topology。Storm的Topology類似於MapReduce中的一個job,但區別在於這個拓撲會永遠運行(或者直到手動結束)。每個Topology中有兩個重要組件:spout和bolt。spout是Topology中數據流的來源,也即對應DAG模型中的起始操作。spout可以從外部源讀取數據並將其以封裝成tuple的形式發送到圖 5-3-1 tuple的數據結構Topology中。bolt是Topology中對tuple進行處理的主要單元。Storm並不區分中間和終止操作,而是將其統一為bolt來進行實現,也即對結果的輸出需要由用戶自己來實現。所有對流數據的處理都是在bolt中實現,bolt可以執行各種基礎操作,如過濾、聚合、連接等。bolt每處理完一個tuple後,可以按照應用需求發送給0個或多個tuple給下遊的bolt。三、Storm中的並行度指定Storm中的並行度有三層含義。首先是worker進程數。Storm可以建立在分布式集群上,每臺物理節點可以發起一個或多個worker進程。一個worker對應一個物理的JVM(Java虛擬機)。通常,整個Topology會由一個或者多個worker進程來負責執行。每個worker會在一個JVM中運行一個或多個executor,每個executor對應一個線程,執行某一個spout或者bolt的計算任務。在Storm中,每個spout/bolt都可以實例化生成多個task在集群中運行,一般默認情況下,executor數與task數一一對應,也即每個實例都由一個單獨的線程來執行。用戶也可以指定task數大於executor數,這時部分task會由同一個線程循環調用來執行。在Storm的Topology建立時,用戶可以根據需要依次來設定整體的worker進程數以及每個spout/bolt對應的executor數和task數。四、Storm中的數據分組和傳輸用戶可以通過定義分組策略(streaming grouping)來決定數據流如何在不同的spout/bolt的task中進行分發和傳輸。分組策略將所有的spout和bolt連接起來構成一個Topology,如圖5-3-2所示。除了5.2.4節所介紹的幾種基本分組策略外,Storm還支持其他的分組策略。例如local grouping,這是shuffle?grouping的 一 種 變 種 分 組 策 略。由於Storm劃分多個worker進程,shuffle grouping可能導致大量的進程間通信,local grouping則是將元組優先發往與自己同進程的下遊task中,若沒有這種下遊task,才繼續沿用shuffle grouping的方式。
圖 5-3-2 streaming grouping
圖 5-3-3 Storm系統架構又如direct grouping,這是一種特殊的分組方式,用戶可以直接指定由下遊的哪一個task來接收數據。五、Storm的分布式系統架構Storm可以運行在分布式集群上。Storm集群結構沿用了主從架構方式,即一個主控節點和多個工作節點。圖5-3-3展示了整個Storm的系統架構。Storm的基本組件分別如下。Nimbus:運行Nimbus的節點是Storm集群的主控節點,Nimbus類似Hadoop中JobTracker的角色,是用戶和Storm系統之間的交互點。Nimbus主要的工作是用於用戶提交Topology、進行集群任務的分配調度、進行集群監控和統計等。Supervisor:每個工作節點上都運行著一個Supervisor,Supervisor用於接收Nimbus分配的任務,並根據分配產生worker進程。同時它還會監視worker的健康狀況,在必要的情況下會重啟worker進程。ZooKeeper:Storm系統借用Zookeeper集群來進行Nimbus和Supervisor之間的所有協調工作,包括Nimbus對Supervisor所執行的任務的調配,以及幫助Nimbus監控所有Supervisor和task的運行情況,以便失效時迅速重啟。六、Storm的編程示例下面,以一個簡單的WordCount應用為例,來進行Storm編程模型的講解。(1)實現生成數據的spout,封裝數據首先構建一個CreateSentenceSpout來進行數據流的生成。為了簡化說明,從若幹給定的靜態句子列表中每次隨機抽取一句作為一個tuple來傳遞給下遊bolt進行處理。CreateSentenceSpout的具體實現如代碼5-3-1所示。
以上代碼中,BaseRichSpout類是Storm提供的一個簡單接口,使用它可以默認實現很多方法,使用戶只用關心實現應用所需要的代碼上去。在spout中最主要的工作就是數據的封裝。Spout的核心代碼在nextTuple( )方法中實現,即如何產生所需的tuple並進行傳輸。Spout會循環調用此方法來不斷產生新的tuple。在本例中,從open( )方法裏給定的句子列表中隨機抽取一條作為tuple,並通過emit方法將tuple進行傳輸。在emit生成tuple時,還需要對tuple中的每個字段進行聲明。這是由declareOutputFields( )方法來實現。這個方法是Storm中所有spout/bolt都需要實現的一個方法。在本例中,生成的每個句子對應一個tuple,其只具有一個字段,字段的值就是句子本身,因此在declareOutputFields( )中聲明字段只有一個“sentence”。open( )方法是對應組件在進行初始化時執行的方法,其中要註意的是open( )方法會接收SpoutOutputCollector對象所提供的後續tuple傳輸方法作為參數,因此在open( )方法的實現中,需要將其引用保存在一個變量當中,以便nextTuple( )方法調用。(2)實現對流數據進行操作處理的bolt在WordCount應用中,對spout生成的句子,構建兩個bolt來進行處理:一個SplitWordBolt來將句子劃分為單詞,一個CountBolt來對劃分好的單詞進行累計計數。下面,以SplitWordBolt為例來進行講解,其實現代碼如代碼5-3-2所示。
BasicRichBolt類同樣是Storm對於bolt類提供的一個簡單接口,使用戶能夠僅集中編寫所需的操作邏輯即可。Bolt的核心是execute( )方法。每當接收到一個新的tuple,都會直接對此方法進行調用,然後執行。使用getStringByField( )方法可以讀取在上遊組件生成tuple時聲明的對應字段裏的值。當完成處理後,如果新產生的tuple需要繼續向後傳輸,可以通過調用emit方法對tuple進行發送。prepare( )方法與spout中 的open( )方 法 功 能 相 似。Declare-OutputFields( )方法與spout( )中相同,這裏不再贅述。(3)構建流應用Topology,並指明並行度和分組策略實現了對應的spout和bolt功能之後,最後就是將其連接成一個完整的Topology。本例中Topology的代碼如代碼5-3-3所示。
以上代碼中,首先生成了TopologyBuilder的一個實例,然後分別對應生成spout和bolt的各個實例。在setSpout和setBolt方法中,第一個參數為對應的組件註冊了ID,第二個參數生成對應組件的實例,而第三個參數為對應組件需要生成的executor個數。在setBolt方法中,除了對應生成實例外,還需要指定每個bolt需要接收哪個組件發送給自己的數據,以及數據的發送方式,即分組策略。例如本例中,CountBolt需要從SplitWordBolt處接收數據,SplitWordBolt發送的數據以fields grouping( 同key grouping) 的方式進行發送,其中用於分組的鍵值為SplitWordBolt發送tuple的“word”字段的值。最後,可以自由指定程序的並行度。可以使用setNumWorkers方法來指定用於執行此Topology中worker進程的個數,本例中為整個Topology分配了4個worker進程;可以用setSpout和setBolt方法中的第三個參數指定executor數量,而若需要指定更多的task數,則可以繼續使用setNumTasks進行設定。本例為每個spout/bolt都生成了4個executor,而進一步為SplitWordBolt分配了8個task,這使得每2個task由一個executor線程來負責執行。Spark StreamingSpark Streaming是Spark API核心擴展,提供對實時數據流進行流式處理,具備可擴展、高吞吐和容錯等特性。Spark Streaming支持從多種數據源中提取數據,例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,並提供了一些高級的API來表示復雜處理算法,如map、reduce、join、windows等,最後可以將得到的結果存儲到分布式文件系統(如HDFS)、數據庫或者其他輸出,Spark的機器學習和圖計算的算法也可以應用於Spark Streaming的數據流中。一、Spark Streaming中的數據封裝和Storm不同的是,Spark Streaming本質上是一個典型的微批處理系統,其與以元組為單位進行流式處理不同,它將無盡的數據流按時間切分為連續的小批次數據,然後以傳統的批處理方法來進行快速連續的處理。在Spark Streaming中,數據流被抽象成以時間片段分隔開的離散流(discretized stream)形式。簡單而言,就是將所有的流數據按照一定的批大小(如1秒)分割成一段又一段的小批次數據,如圖5-3-4所示。Spark Streaming使用Spark引擎,將每一段小批次數據轉化成為Spark當中的RDD(彈性分布式數據集)。流數據即以RDD的形式在Spark Streaming系統中進行運算。
圖 5-3-4 Spark Streaming的離散流二、Spark Streaming中的應用拓撲建立Spark Streaming同樣在系統中構建出DAG的處理模型。不過與Storm不同,Spark Streaming並不使用固定的處理單元來執行單一的操作。實際上,Spark Streaming中的DAG與Spark Core中的DAG相同,只是用DAG的形式將每一個時間分片對應的RDD進行運算的job來進一步劃分成任務集stage,以便進行高效的批處理。Spark Streaming沿用了Spark Core對RDD提供的transformation操作,將所有RDD依次進行轉換,應用邏輯分別進行轉換處理,進而實現對整個離散流的轉換。圖5-3-5展示了Spark Streaming的整體計算框架,一方面在線輸入的數據流被按照時間切分為若幹小批次數據並被轉化成為RDD存儲在內存中,另一方面,根據流應用邏輯,也即流處理引用抽象出DAG拓撲,制定出相應的RDD transformation。RDD不斷被批量執行transformation操作,直到產生最終的結果。
圖 5-3-5 Spark Streaming 計算框架[7]
三、Spark Streaming中的並行度指定
由於Spark Streaming本質上是將數據流的任務劃分成為大量的微批數據,對應多個job來執行,所以Spark Streaming的並行度設定與Spark進行批處理時的設定一樣,只能設定整體job的並行度,而不能對每個操作單獨的並行度進行設置。然而由於批處理的特性,Spark?Streaming可以最大化對系統並行能力的利用,也能獲得相對更高的系統吞吐率。四、Spark Streaming中的數據分組和傳輸由於使用微批處理技術,Spark Streaming的數據被打包為一個個微批,而每個微批相互獨立地進行處理,所以不涉及所提到的數據分組與傳輸問題。但這也展現出微批處理的一個局限性,其難以靈活處理基於用戶自定義的窗口的聚合、計數等操作,也不能進行針對數據流的連續計算,如兩個數據流的實時連接等操作。五、Spark Streaming的系統框架Spark Streaming建立在Spark系統之上,其系統架構相對於Spark的修改和新增部分如圖5-3-6所示。
圖 5-3-6 Spark Streaming 基於Spark修改和新增的組件[7]除開Spark系統本身組件外,Spark Streaming主要組件如下。master:是Spark Streaming中流應用的入口。根據應用邏輯產生用於轉換RDD的task然後進行調度,並對這些task進行追蹤。D-Stream?lineage包含了離散流間的轉換關系,類似流應用的DAG圖。client:Spark Streaming建立了一個client庫來將數據傳入到系統當中。worker:是Spark Streaming中流數據的入口以及執行RDD轉換的主要組件。相對於Spark,主要新增了input receiver對流數據進行獨立的接收。流數據可以是從系統外在線地進行讀取進來,並轉化為離散流的形式,也可以是經過其他execution執行轉化後的離散流。六、Spark Streaming的編程示例Spark Streaming的編程較為簡單,這是由於它本身基於Spark建立,有豐富的API可以調用,可以省去大量無關的編碼。下面同樣以WordCount應用為例來對Spark Streaming的編程模型進行說明。(1)離散流的輸入和數據封裝在WordCount應用中,假定直接從一個socket來獲取源源不斷的句子數據流,那麽數據流的輸入具體實現如代碼5-3-4所示。
以上代碼中,首先建立了JavaStreamingContext對象,同時需要指定劃分離散流的時間間隔。本例中指定了每隔1s就劃分一次微批。接著,指定從端口8888的socket中持續獲取數據流。通過以上代碼,每個executor獲取的數據流就會根據1s的時間間隔不斷劃分成小批次,並進一步轉化為RDD。這一串RDD的組合即是新產生的“lines”離散流。(2)建立應用拓撲,進行離散流的轉化離散流的轉化即根據相應的應用邏輯指定對應的RDD的轉化方式。在WordCount應用中,先將句子轉化為若幹的單詞,然後將每個單詞變成(單詞,計數)的二元對,最後對相同單詞的二元對計數進行累加。具體實現如代碼5-3-5所示。
以上代碼中,利用Spark豐富的transformation方法,將由一個個句子組成的“lines”離散流首先通過flatMap的方式映射為由單詞組成的“words”離散流。進一步通過mapToPair的方式映射為(單詞,計數)二元對組成的“pairs”離散流,這裏每個單詞沒有累加前,計數值就直接等於1。最後通過reduceByKey的方式,對相同單詞的計數進行累加操作。?Apache FlinkApache Flink是一個同時支持分布式數據流處理和數據批處理的大數據處理系統。其特點是完全以流處理的角度出發進行設計,而將批處理看作是有邊界的流處理特殊流處理來執行。Flink可以表達和執行許多類別的數據處理應用程序,包括實時數據分析、連續數據管道、歷史數據處理(批處理)和叠代算法(機器學習、圖表分析等)。Flink同樣是使用單純流處理方法的典型系統,其計算框架與原理和Apache Storm比較相似。Flink做了許多上層的優化,也提供了豐富的API供開發者能更輕松地完成編程工作。一、Flink中的數據封裝Flink能夠支撐對多種類型的數據進行處理,例如Flink支撐任意的Java或者Scala類型,這使得Flink使用更加靈活。類似Storm,Flink同樣也可以使用多字段的tuple為其基本數據單元。Flink可以支持了多種Flink tuple類型(tuple1至tuple25),每種tuple都是一個固定長度的對象序列。二、Flink中的應用拓撲建立Flink中核心概念為數據流(stream)和轉換(transformation)。每個轉換對應的是一個簡單的操作,根據應用邏輯,轉換按先後順序構成了流應用的DAG圖,如圖5-3-7所示。數據流在轉換之間傳遞,直到完成所有的轉換進行輸出。Flink應用包含明確的源操作和匯聚操作,用於數據的輸入與輸出。
Flink內 部 實 現 了 許 多 基 本 的 轉 換 操 作, 比 如Map、FlatMap、Reduce、Window等, 同 時 也 實 現 了 許 多 源 和 匯 聚 操 作, 比 如writeAsText、writeAsCsv、print等。Flink提供了豐富的API以簡化用戶對應用拓撲的編寫和表達。三、Flink中的並行度指定與Storm相似,Flink程序的計算框架本質上也並行分布式的。在系統中,一個流包含一個或多個流分區,而每一個轉換操作包含一個或多個子任務實例。操作的子任務間彼此獨立,以不同的線程執行,可以運行在不同的機器或容器上。一個Flink應用同樣運行在一個或多個worker進程當中。一個worker中生成一個或多個task slot。每個task slot用以承載和執行Flink每個轉換操作的一個子任務實例。Flink可以指定全局的task slot數目作為其最大的並行度。同時若部分轉換不需要使用如此多資源,Flink也可以指定每一操作具體的子任務數。每個轉換操作對應的子任務默認輪詢地分布在分配的task slot內。四、Flink中的數據分組與傳輸Flink的數據分組方法主要包括一對一(one-to-one)模式或者重分組(redistributing)模式兩種。采用一對一模式時,數據流中元素的分組和順序會保持不變,也就是說,對於上下遊的兩個不同的轉換操作,下遊任一子任務內要處理的元組數據,與上遊相同順序的子任務所處理的元組數據完全一致。采用重分組模式則會改變數據流所在的分組。重分組後元組的目標子任務根據處理的變換方法不同而發生改變。例如經過keyBy( )轉化,元組就會根據keyBy( )的參數選擇對應的字段作為key值,進行哈希計算來重新分組。經過broadcast( )轉化即相應地進行廣播等。五、Flink的系統框架圖5-3-8顯示了Apache Flink的分布式運行環境架構。Flink的系統架構中包含以下重要組件。jobclinet:jobclient是一個獨立的程序執行入口。job client負責接收用戶提交的程序,並將用戶提交的程序通過優化器和graph builder轉換成dataflow graph(類似流應用的DAG圖)。然後將生成的data flow提交給job manager進行job的管理和調度。一旦執行完成,job client返回給用戶最後的執行結果。jobmanager:對應一個Flink程序的master進程,負責job的管理和資源的協調。主要包括任務調度、監控任務的執行狀態、協調任務的執行、檢查點管理和失敗恢復等。
圖 5-3-8 Apache Flink分 布 式運行環境[9]taskmanager和taskslot:是Flink中具體負責執行tasks的組件。每個taskmanage對應是運行在節點上的JVM進程,擁有一定的量的資源。比如內存、CPU、網絡、磁盤等。每個執行的task運行在其中的一個或多個線程中。taskslot是分布式程序真正執行task的地方。每個taskslot可以包括JVM進程中的一部分內存。六、Flink的編程示例Flink的編程核心也就在 數 據 流 和 轉 換 上。 下 面, 依 然 以WordCount為例來對Flink的編程模型進行說明。代碼5-3-6是Flink中以5分鐘為窗口進行一次求和統計的WordCount應用代碼。
在以上代碼中,定義了一個DataStream實例,並通過socket的方式從8888端口監聽在線獲取數據。監聽到的句子數據被使用flatmap轉化成單詞,並直接以(單詞,計數)二元對的形式記錄下來。當流被轉化為二元對後,接著根據當前第0位的字段“word”進行keyBy( )的操作,最後以5分鐘為窗口大小,對計數值進行累計。Flink的編程非常簡潔和直觀,上例中,DataStream從源操作從socket在線讀取數據,到各種轉換操作,到最後的匯聚求和操作都可以直接表達出來。Flink提供了豐富的API和各種表達上的簡化來降低用戶的編程難度和編程量。上例通過使用env.setParallelism來設置流處理程序的整體並行度,即taskslot數量為8。同時,可以進一步為每一個操作設置並行度,如在saveAsText( )操作後通過使用setParallelism將這個操作的並行度修改為1。
從Storm到Flink:大數據處理的開源系統及編程模型