1. 程式人生 > >Spark Streaming:大規模流式資料處理

Spark Streaming:大規模流式資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。 

  • 複雜的批量資料處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間。
  • 基於歷史資料的互動式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間。
  • 基於實時資料流的資料處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。 
目前已有很多相對成熟的開源軟體來處理以上三種情景,我們可以利用MapReduce來進行批量資料處理,可以用Impala來進行互動式查詢,對於流式資料處理,我們可以採用Storm。對於大多數網際網路公司來說,一般都會同時遇到以上三種情景,那麼在使用的過程中這些公司可能會遇到如下的不便。 

  • 三種情景的輸入輸出資料無法無縫共享,需要進行格式相互轉換。
  • 每一個開源軟體都需要一個開發和維護團隊,提高了成本。
  • 在同一個叢集中對各個系統協調資源分配比較困難。 
BDAS就是以Spark為基礎的一套軟體棧,利用基於記憶體的通用計算模型將以上三種情景一網打盡,同時支援Batch、Interactive、Streaming的處理,且相容支援HDFS和S3等分散式檔案系統,可以部署在YARN和Mesos等流行的叢集資源管理器之上。BDAS的構架如圖1所示,其中Spark可以替代MapReduce進行批處理,利用其基於記憶體的特點,特別擅長迭代式和互動式資料處理;Shark處理大規模資料的SQL查詢,相容Hive的HQL。本文要重點介紹的Spark Streaming,在整個BDAS中進行大規模流式處理。

 

圖1 BDAS軟體棧 

Spark Streaming構架 

  • 計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行疊加,或者儲存到外部裝置。圖2顯示了Spark Streaming的整個流程。

 

圖2 Spark Streaming構架圖


  • 容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。

圖3 Spark Streaming中RDD的lineage關係圖 

  • 對於Spark Streaming來說,其RDD的傳承關係如圖3所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性。所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。 
  • 實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解,以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

  • 擴充套件性與吞吐量:Spark目前在EC2上已能夠線性擴充套件到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的資料量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

圖4 Spark Streaming與Storm吞吐量比較圖

Spark Streaming的程式設計模型 

Spark Streaming的程式設計和Spark的程式設計如出一轍,對於程式設計的理解也非常類似。對於Spark來說,程式設計就是對於RDD的操作;而對於Spark Streaming來說,就是對DStream的操作。下面將通過一個大家熟悉的WordCount的例子來說明Spark Streaming中的輸入操作、轉換操作和輸出操作。 

  • Spark Streaming初始化:在開始進行DStream操作之前,需要對Spark Streaming進行初始化生成StreamingContext。引數中比較重要的是第一個和第三個,第一個引數是指定Spark Streaming執行的叢集地址,而第三個引數是指定Spark Streaming執行時的batch視窗大小。在這個例子中就是將1秒鐘的輸入資料進行一次Spark Job處理。
val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes], [Jars]) 
  •  Spark Streaming的輸入操作:目前Spark Streaming已支援了豐富的輸入介面,大致分為兩類:一類是磁碟輸入,如以batch size作為時間間隔監控HDFS檔案系統的某個目錄,將目錄中內容的變化作為Spark Streaming的輸入;另一類就是網路流的方式,目前支援Kafka、Flume、Twitter和TCP socket。在WordCount例子中,假定通過網路socket作為輸入流,監聽某個特定的埠,最後得出輸入DStream(lines)。
val lines = ssc.socketTextStream(“localhost”,8888)
  • Spark Streaming的轉換操作:與Spark RDD的操作極為類似,Spark Streaming也就是通過轉換操作將一個或多個DStream轉換成新的DStream。常用的操作包括map、filter、flatmap和join,以及需要進行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我們首先需要將DStream(lines)切分成單詞,然後將相同單詞的數量進行疊加, 最終得到的wordCounts就是每一個batch size的(單詞,數量)中間結果。 
val words = lines.flatMap(_.split(“ ”))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)


另外,Spark Streaming有特定的視窗操作,視窗操作涉及兩個引數:一個是滑動視窗的寬度(Window Duration);另一個是視窗滑動的頻率(Slide Duration),這兩個引數必須是batch size的倍數。例如以過去5秒鐘為一個輸入視窗,每1秒統計一下WordCount,那麼我們會將過去5秒鐘的每一秒鐘的WordCount都進行統計,然後進行疊加,得出這個視窗中的單詞統計。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))


但上面這種方式還不夠高效。如果我們以增量的方式來計算就更加高效,例如,計算t+4秒這個時刻過去5秒視窗的WordCount,那麼我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量(如圖5所示),這種方法可以複用中間三秒的統計量,提高統計的效率。 

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))



圖5 Spark Streaming中滑動視窗的疊加處理和增量處理 

  • Spark Streaming的輸入操作:對於輸出操作,Spark提供了將資料列印到螢幕及輸入到檔案中。在WordCount中我們將DStream wordCounts輸入到HDFS檔案中。
wordCounts = saveAsHadoopFiles(“WordCount”)
  • Spark Streaming啟動:經過上述的操作,Spark Streaming還沒有進行工作,我們還需要呼叫Start操作,Spark Streaming才開始監聽相應的埠,然後收取資料,並進行統計。
ssc.start()

Spark Streaming案例分析 

在網際網路應用中,網站流量統計作為一種常用的應用模式,需要在不同粒度上對不同資料進行統計,既有實時性的需求,又需要涉及到聚合、去重、連線等較為複雜的統計需求。傳統上,若是使用Hadoop MapReduce框架,雖然可以容易地實現較為複雜的統計需求,但實時性卻無法得到保證;反之若是採用Storm這樣的流式框架,實時性雖可以得到保證,但需求的實現複雜度也大大提高了。Spark Streaming在兩者之間找到了一個平衡點,能夠以準實時的方式容易地實現較為複雜的統計需求。 下面介紹一下使用Kafka和Spark Streaming搭建實時流量統計框架。 

  • 資料暫存:Kafka作為分散式訊息佇列,既有非常優秀的吞吐量,又有較高的可靠性和擴充套件性,在這裡採用Kafka作為日誌傳遞中介軟體來接收日誌,抓取客戶端傳送的流量日誌,同時接受Spark Streaming的請求,將流量日誌按序傳送給Spark Streaming叢集。
  • 資料處理:將Spark Streaming叢集與Kafka叢集對接,Spark Streaming從Kafka叢集中獲取流量日誌並進行處理。Spark Streaming會實時地從Kafka叢集中獲取資料並將其儲存在內部的可用記憶體空間中。當每一個batch視窗到來時,便對這些資料進行處理。 
  • 結果儲存:為了便於前端展示和頁面請求,處理得到的結果將寫入到資料庫中。 

相比於傳統的處理框架,Kafka+Spark Streaming的架構有以下幾個優點。 

  • Spark框架的高效和低延遲保證了Spark Streaming操作的準實時性。
  • 利用Spark框架提供的豐富API和高靈活性,可以精簡地寫出較為複雜的演算法。 
  • 程式設計模型的高度一致使得上手Spark Streaming相當容易,同時也可以保證業務邏輯在實時處理和批處理上的複用。 

在基於Kafka+Spark Streaming的流量統計應用執行過程中,有時會遇到記憶體不足、GC阻塞等各種問題。下面介紹一下如何對Spark Streaming應用程式進行調優來減少甚至避免這些問題的影響。 

效能調優 

優化執行時間

  •  增加並行度。確保使用整個叢集的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用叢集資源。
  • 減少資料序列化、反序列化的負擔。Spark Streaming預設將接收到的資料序列化後儲存以減少記憶體的使用。但序列化和反序列化需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的序列化介面可以更高效地使用CPU。 
  • 設定合理的batch視窗。在Spark Streaming中,Job之間有可能存在著依賴關係,後面的Job必須確保前面的Job執行結束後才能提交。若前面的Job執行時間超出了設定的batch視窗,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此,設定一個合理的batch視窗確保Job能夠在這個batch視窗中結束是必須的。 
  • 減少任務提交和分發所帶來的負擔。通常情況下Akka框架能夠高效地確保任務及時分發,但當batch視窗非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常會比使用Fine-Grained Mesos模式有更小的延遲。 

優化記憶體使用

  • 控制batch size。Spark Streaming會把batch視窗內接收到的所有資料存放在Spark內部的可用記憶體區域中,因此必須確保當前節點Spark的可用記憶體至少能夠容納這個batch視窗內所有的資料,否則必須增加新的資源以提高叢集的處理能力。
  • 及時清理不再使用的資料。上面說到Spark Streaming會將接收到的資料全部儲存於內部的可用記憶體區域中,因此對於處理過的不再需要的資料應及時清理以確保Spark Streaming有富餘的可用記憶體空間。通過設定合理的spark.cleaner.ttl時長來及時清理超時的無用資料。 
  • 觀察及適當調整GC策略。GC會影響Job的正常執行,延長Job的執行時間,引起一系列不可預料的問題。觀察GC的執行情況,採取不同的GC策略以進一步減小記憶體回收對Job執行的影響。 

總結 

Spark Streaming提供了一套高效、可容錯的準實時大規模流式處理框架,它能和批處理及即時查詢放在同一個軟體棧中,降低學習成本。如果你學會了Spark程式設計,那麼也就學會了Spark Streaming程式設計,如果理解了Spark的排程和儲存,那麼Spark Streaming也類似。對開源軟體感興趣的讀者,我們可以一起貢獻社群。目前Spark已在Apache孵化器中。按照目前的發展趨勢,Spark Streaming一定將會得到更大範圍的使用。

相關推薦

Spark Streaming大規模資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

Spark學習——Spark Streaming大規模資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

spark學習五 DStream(spark資料處理

流資料的特點 與一般的檔案(即內容已經固定)型資料來源相比,所謂的流資料擁有如下的特點 1.   資料一直處在變化中 2.   資料無法回退 3.   資料一直源源不斷的湧進 DStream 如果要用一句話來概括SparkStreaming的處理思路的話,那就是"將連續的資

kubernetes log 資料處理

PS: 最近在重構公司的業務容器化平臺,記錄一塊。關於容器日誌的, kubernetes python API本身提供了日誌流式資料,在以前的版本是不會輸出新資料的,後續版本進行了改進。 直接上程式碼 Flask 前端路由塊 # Router """獲取專案pod的日誌""" @api_cluster

資料處理

1、直接登陸伺服器:ssh 2014210***@thumedia.org -p 6349 建立streaming.py:   touch streaming.py,並且如下編輯: <span style="font-size:14px;">#! /usr/

JDK8 新特性資料處理

在學習JDK8新特性Optional類的時候,提到對於Optional的兩個操作對映和過濾設計到JDK提供的流式出來。這篇文章便詳細的介紹流式處理: 一. 流式處理簡介 流式處理給開發者的第一感覺就是讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完

【java8】持續精進-之資料處理

流式處理簡介 在我接觸到java8流式處理的時候,我的第一感覺是流式處理讓集合操作變得簡潔了許多,通常我們需要多行程式碼才能完成的操作,藉助於流式處理可以在一行中實現。比如我們希望對一個包含整數的集合中篩選出所有的偶數,並將其封裝成為一個新的List返回,那麼

資料求索(9): log4j + flume + kafka + spark streaming實時日誌處理實戰

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰 一、實時流處理 1.1 實時計算 跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來

Flume+Kafka+Spark Streaming實現大資料實時資料採集

大資料實時流式資料處理是大資料應用中最為常見的場景,與我們的生活也息息相關,以手機流量實時統計來說,它總是能夠實時的統計出使用者的使用的流量,在第一時間通知使用者流量的使用情況,並且最為人性化的為使用者提供各種優惠的方案,如果採用離線處理,那麼等到使用者流量超標

基於Flume+Kafka+Spark Streaming打造實時處理項目實戰課程

大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載

資料學習storm計算

       Storm是一個分散式的、高容錯的實時計算系統。Storm適用的場景:   1、Storm可以用來用來處理源源不斷的訊息,並將處理之後的結果儲存到持久化介質中。   2、由於Storm的處理元件都是分散式的,而且處理延遲都極低,所以可以Storm可以做為

第4課Spark Streaming的Exactly Once的事務處理

本期內容: Exactly once 輸出不重複 Exactly once 1,事務一定會被處理,且只被處理一次; 2,輸出能夠輸出且只會被輸出。 Receiver:資料通過BlockManager寫入記憶體+磁碟或者通過WAL來保證資料的安全性。WAL機制:寫資料

Spark Streaming 到 Apache Flink : 實時資料在愛奇藝的演進

作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產與實踐過程。你可以藉此瞭解到愛奇藝引入A

Spark Streaming 和 Flink 誰是資料開發者的最愛

本文從程式設計模型、任務排程、時間機制、Kafka 動態分割槽的感知、容錯及處理語義、背壓等幾個方面對比 Spark Streaming 與 Flink,希望對有實時處理需求業務的企業端使用者在框架選型有所啟發。 程式設計模型對比 執行角色 Spark Streaming 執行時的角色(

tranquilizer實現BeamFactory資料寫入到Druid

package com.icsoc.report.druid; import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.m

dplyr六個基本資料處理技法

摘要 本文簡介如何使用 dplyr 與 base R 語法進行六個基本資料處理技法,並支援初學者先從 dplyr 開始做基本資料處理技法這個論點。 論點起源 在 Tidyverse:R 語言學習之旅的新起點一文中我們提到過新興的 R 語言學習路徑可以從 tidyver

IO資料的讀寫傳輸

IO流概括圖:   IO流的分類:  按流:   輸入流(InputStream和Reader):從硬碟或者別的地方讀入記憶體 輸出流(OutputStream和Writer):從記憶體裡向硬碟或別的地方輸出 按操作型別: 位元組流(Inpu

Spark Streaming 基本工作原理

一、 Spark Streaming簡介 Spark Streaming是Spark Core API的一種擴充套件,它可以用於進行大規模、高吞吐量、容錯的實時資料流的處理。它支援從很多種資料來源中讀取資料,比如Kafka、Flume、Twitter、ZeroM

視音訊資料處理入門PCM音訊取樣資料處理

                =====================================================視音訊資料處理入門系列文章:=====================================================上一篇文章記錄了RGB/YUV視訊畫素

R in Action學習筆記一個簡單的資料處理例項

這是來自《R in Action》中的一個數據處理例項。 資料:一組學生的名字和其對應的數學、科學、英語的成績; 資料分析需求: 1、為所有學生確定一個單一的成績衡量指標; 2、將前20%的學生評定為A,接下來20%的學生評定為B,依次類推; 3、按照學生姓氏的字母順序對學生排序。