1. 程式人生 > >Spark Streaming 技術點彙總

Spark Streaming 技術點彙總

Spark Streaming 支援實時資料流的可擴充套件(Scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。
Spark Streaming 支援實時資料流的可擴充套件(Scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。

架構圖
特性如下:
• 可線性伸縮至超過數百個節點;
• 實現亞秒級延遲處理;
• 可與 Spark 批處理和互動式處理無縫整合;
• 提供簡單的API實現複雜演算法;
• 更多的流方式支援,包括 Kafka、Flume、Kinesis、Twitter、ZeroMQ 等。
001、原理
Spark 在接收到實時輸入資料流後,將資料劃分成批次(divides the data into batches),然後轉給 Spark Engine 處理,按批次生成最後的結果流(generate the final stream of results in batches)。

002、API
DStream:
DStream(Discretized Stream,離散流)是 Spark Stream 提供的高階抽象連續資料流。
組成:一個 DStream 可看作一個 RDDs 序列。
核心思想:將計算作為一系列較小時間間隔的、狀態無關的、確定批次的任務,每個時間間隔內接收的輸入資料被可靠儲存在叢集中,作為一個輸入資料集。

特性:一個高層次的函數語言程式設計 API、強一致性以及高校的故障恢復。
應用程式模板:
模板1

模板2

WordCount示例

Input DStream:
Input DStream 是一種從流式資料來源獲取原始資料流的 DStream,分為基本輸入源(檔案系統、Socket、Akka Actor、自定義資料來源)和高階輸入源(Kafka、Flume等)。
Receiver:
每個 Input DStream(檔案流除外)都會對應一個單一的 Receiver物件,負責從資料來源接收資料並存入 Spark 記憶體進行處理。應用程式中可建立多個 Input DStream 並行接收多個數據流。
每個 Receiver 是一個長期執行在Worker或者 Executor 上的 Task,所以會佔用該應用程式的一個核(core)。如果分配給 Spark Streaming 應用程式的核數小於或等於 Input DStream 個數(即Receiver個數),則只能接收資料,卻沒有能力全部處理(檔案流除外,因為無需Receiver)。
Spark Streaming 已封裝各種資料來源,需要時參考官方文件。
Transformation Operation
常用Transformation

updateStateByKey(func)
updateStateByKey可對DStream中的資料按key做reduce,然後對各批次資料累加
WordCount的updateStateByKey版本

transform(func)
通過對原 DStream 的每個 RDD 應用轉換函式,建立一個新的 DStream。
官方文件程式碼舉例

Window operations
視窗操作:基於 window 對資料 transformation(個人認為與Storm的tick相似,但功能更強大)。
引數:視窗長度(window length)和滑動時間間隔(slide interval)必須是源DStream 批次間隔的倍數。
舉例說明:視窗長度為3,滑動時間間隔為2;上一行是原始 DStream,下一行是視窗化的 DStream。

常見 window operation

官方文件程式碼舉例

join(otherStream, [numTasks])
連線資料流
官方文件程式碼舉例1

官方文件程式碼舉例2

Output Operation

快取與持久化:
通過 persist()將 DStream 中每個 RDD 儲存在記憶體。
Window operations 會自動持久化在記憶體,無需顯示呼叫 persist()。
通過網路接收的資料流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)執行 persist()時,預設在兩個節點上持久化序列化後的資料,實現容錯。
Checkpoint:
用途:Spark 基於容錯儲存系統(如HDFS、S3)進行故障恢復。
分類:
元資料檢查點:儲存流式計算資訊用於 Driver 執行節點的故障恢復,包括建立應用程式的配置、應用程式定義的 DStream operations、已入隊但未完成的批次。
資料檢查點:儲存生成的 RDD。由於 stateful transformation 需要合併多個批次的資料,即生成的 RDD 依賴於前幾個批次 RDD 的資料(dependency chain),為縮短 dependency chain 從而減少故障恢復時間,需將中間 RDD 定期儲存至可靠儲存(如HDFS)。
使用時機:
Stateful transformation:updateStateByKey()以及 window operations。
需要 Driver 故障恢復的應用程式。
003、使用方法
Stateful transformation

需要 Driver 故障恢復的應用程式(以WordCount舉例):如果 checkpoint 目錄存在,則根據 checkpoint 資料建立新 StreamingContext;否則(如首次執行)新建 StreamingContext。

checkpoint 時間間隔
方法:

原則:一般設定為滑動時間間隔的5-10倍。
分析:checkpoint 會增加儲存開銷、增加批次處理時間。當批次間隔較小(如1秒)時,checkpoint 可能會減小 operation 吞吐量;反之,checkpoint 時間間隔較大會導致 lineage 和 task 數量增長。
004、效能調優
降低批次處理時間:
資料接收並行度
增加 DStream:接收網路資料(如Kafka、Flume、Socket等)時會對資料反序列化再儲存在 Spark,由於一個 DStream 只有 Receiver 物件,如果成為瓶頸可考慮增加 DStream。

設定“spark.streaming.blockInterval”引數:接收的資料被儲存在 Spark 記憶體前,會被合併成 block,而 block 數量決定了Task數量;舉例,當批次時間間隔為2秒且 block 時間間隔為200毫秒時,Task 數量約為10;如果Task數量過低,則浪費了 CPU 資源;推薦的最小block時間間隔為50毫秒。
顯式對 Input DStream 重新分割槽:在進行更深層次處理前,先對輸入資料重新分割槽。

資料處理並行度:reduceByKey、reduceByKeyAndWindow 等 operation 可通過設定“spark.default.parallelism”引數或顯式設定並行度方法引數控制。
資料序列化:可配置更高效的 Kryo 序列化。
設定合理批次時間間隔
原則:處理資料的速度應大於或等於資料輸入的速度,即批次處理時間大於或等於批次時間間隔。
方法:
先設定批次時間間隔為5-10秒以降低資料輸入速度;
再通過檢視 log4j 日誌中的“Total delay”,逐步調整批次時間間隔,保證“Total delay”小於批次時間間隔。
記憶體調優
持久化級別:開啟壓縮,設定引數“spark.rdd.compress”。
GC策略:在Driver和Executor上開啟CMS。