1. 程式人生 > >Spark Performance Tuning (性能調優)

Spark Performance Tuning (性能調優)

() man inter ber index data- key 兩種 跟蹤

在集群上的 Spark Streaming application 中獲得最佳性能需要一些調整.本節介紹了可調整的多個 parameters (參數)和 configurations (配置)提高你的應用程序性能.在高層次上, 你需要考慮兩件事情:

  1. 通過有效利用集群資源, Reducing the processing time of each batch of data (減少每批數據的處理時間).

  2. 設置正確的 batch size (批量大小), 以便 batches of data (批量的數據)可以像 received (被接收)處理一樣快(即 data processing (數據處理)與 data ingestion (數據攝取)保持一致).

Reducing the Batch Processing Times (減少批處理時間)

在 Spark 中可以進行一些優化, 以 minimize the processing time of each batch (最小化每批處理時間).這些已在 Tuning Guide (調優指南) 中詳細討論過.本節突出了一些最重要的.

Level of Parallelism in Data Receiving (數據接收中的並行級別)

通過網絡接收數據(如Kafka, Flume, socket 等)需要 deserialized (反序列化)數據並存儲在 Spark 中.如果數據接收成為系統的瓶頸, 那麽考慮一下 parallelizing the data receiving (並行化數據接收).註意每個 input DStream 創建接收 single stream of data (單個數據流)的 single receiver (單個接收器)(在 work machine 上運行). 因此, 可以通過創建多個 input DStreams 來實現 Receiving multiple data streams (接收多個數據流)並配置它們以從 source(s) 接收 data stream (數據流)的 different partitions (不同分區).例如, 接收 two topics of data (兩個數據主題)的單個Kafka input DStream 可以分為兩個 Kafka input streams (輸入流), 每個只接收一個 topic (主題).這將運行兩個 receivers (接收器), 允許 in parallel (並行)接收數據, 從而提高 overall throughput (總體吞吐量).這些 multiple DStreams 可以 unioned (聯合起來)創建一個 single DStream .然後 transformations (轉化)為應用於 single input DStream 可以應用於 unified stream .如下這樣做.

1 val numStreams = 5
2 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
3 val unifiedStream = streamingContext.union(kafkaStreams)
4 unifiedStream.print()

應考慮的另一個參數是 receiver’s block interval (接收器的塊間隔), 這由configuration parameter (配置參數) 的 spark.streaming.blockInterval

決定.對於大多數 receivers (接收器), 接收到的數據 coalesced (合並)在一起存儲在 Spark 內存之前的 blocks of data (數據塊).每個 batch (批次)中的 blocks (塊)數確定將用於處理接收到的數據以 map-like (類似與 map 形式的) transformation (轉換)的 task (任務)的數量.每個 receiver (接收器)每 batch (批次)的任務數量將是大約( batch interval (批間隔)/ block interval (塊間隔)).例如, 200 ms的 block interval (塊間隔)每 2 秒 batches (批次)創建 10 個 tasks (任務).如果 tasks (任務)數量太少(即少於每個機器的內核數量), 那麽它將無效, 因為所有可用的內核都不會被使用處理數據.要增加 given batch interval (給定批間隔)的 tasks (任務)數量, 請減少 block interval (塊間??隔).但是, 推薦的 block interval (塊間隔)最小值約為 50ms , 低於此任務啟動開銷可能是一個問題.

使用 multiple input streams (多個輸入流)/ receivers (接收器)接收數據的替代方法是明確 repartition (重新分配) input data stream (輸入數據流)(使用 inputStream.repartition(<number of partitions>) ). 這會在 further processing (進一步處理)之前將 received batches of data (收到的批次數據) distributes (分發)到集群中指定數量的計算機.

Level of Parallelism in Data Processing (數據處理中的並行度水平)

如果在任何 computation (計算)階段中使用 number of parallel tasks (並行任務的數量), 則 Cluster resources (集群資源)可能未得到充分利用. 例如, 對於 distributed reduce (分布式 reduce)操作, 如 reduceByKeyreduceByKeyAndWindow , 默認並行任務的數量由 spark.default.parallelism configuration property 控制. 您 可以通過 parallelism (並行度)作為參數(見 PairDStreamFunctions 文檔 ), 或設置 spark.default.parallelism configuration property 更改默認值.

Data Serialization (數據序列化)

可以通過調優 serialization formats (序列化格式)來減少數據 serialization (序列化)的開銷.在 streaming 的情況下, 有兩種類型的數據被 serialized (序列化).

  • Input data (輸入數據): 默認情況下, 通過 Receivers 接收的 input data (輸入數據)通過 StorageLevel.MEMORY_AND_DISK_SER_2 存儲在 executors 的內存中.也就是說, 將數據 serialized (序列化)為 bytes (字節)以減少 GC 開銷, 並復制以容忍 executor failures (執行器故障).此外, 數據首先保留在內存中, 並且只有在內存不足以容納 streaming computation (流計算)所需的所有輸入數據時才會 spilled over (溢出)到磁盤.這個 serialization (序列化)顯然具有開銷 - receiver (接收器)必須使接收的數據 deserialize (反序列化), 並使用 Spark 的 serialization format (序列化格式)重新序列化它.

  • Persisted RDDs generated by Streaming Operations (流式操作生成的持久 RDDs): 通過 streaming computations (流式計算)生成的 RDD 可能會持久存儲在內存中.例如, window operations (窗口操作)會將數據保留在內存中, 因為它們將被處理多次.但是, 與 StorageLevel.MEMORY_ONLY 的 Spark Core 默認情況不同, 通過流式計算生成的持久化 RDD 將以 StorageLevel.MEMORY_ONLY_SER (即序列化), 以最小化 GC 開銷.

在這兩種情況下, 使用 Kryo serialization (Kryo 序列化)可以減少 CPU 和內存開銷.有關詳細信息, 請參閱 Spark Tuning Guide .對於 Kryo , 請考慮 registering custom classes , 並禁用對象引用跟蹤(請參閱 Configuration Guide 中的 Kryo 相關配置).

在 streaming application 需要保留的數據量不大的特定情況下, 可以將數據(兩種類型)作為 deserialized objects (反序列化對象)持久化, 而不會導致過多的 GC 開銷.例如, 如果您使用幾秒鐘的 batch intervals (批次間隔)並且沒有 window operations (窗口操作), 那麽可以通過明確地相應地設置 storage level (存儲級別)來嘗試禁用 serialization in persisted data (持久化數據中的序列化).這將減少由於序列化造成的 CPU 開銷, 潛在地提高性能, 而不需要太多的 GC 開銷.

Task Launching Overheads (任務啟動開銷)

如果每秒啟動的任務數量很高(比如每秒 50 個或更多), 那麽這個開銷向 slaves 發送任務可能是重要的, 並且將難以實現 sub-second latencies (次要的延遲).可以通過以下更改減少開銷:

  • Execution mode (執行模式): 以 Standalone mode (獨立模式)或 coarse-grained Mesos 模式運行 Spark 比 fine-grained Mesos 模式更好的任務啟動時間.有關詳細信息, 請參閱 Running on Mesos guide .

這些更改可能會將 batch processing time (批處理時間)縮短 100 毫秒, 從而允許 sub-second batch size (次秒批次大小)是可行的.


Setting the Right Batch Interval (設置正確的批次間隔)

對於在集群上穩定地運行的 Spark Streaming application, 該系統應該能夠處理數據盡可能快地被接收.換句話說, 應該處理批次的數據就像生成它們一樣快.這是否適用於 application 可以在 monitoring streaming web UI 中的 processing times 中被找到, processing time (批處理處理時間)應小於 batch interval (批間隔).

取決於 streaming computation (流式計算)的性質, 使用的 batch interval (批次間隔)可能對處理由應用程序持續一組固定的 cluster resources (集群資源)的數據速率有重大的影響.例如, 讓我們考慮早期的 WordCountNetwork 示例.對於特定的 data rate (數據速率), 系統可能能夠跟蹤每 2 秒報告 word counts (即 2 秒的 batch interval (批次間隔)), 但不能每 500 毫秒.因此, 需要設置 batch interval (批次間隔), 使預期的數據速率在生產可以持續.

為您的應用程序找出正確的 batch size (批量大小)的一個好方法是使用進行測試 conservative batch interval (保守的批次間隔)(例如 5-10 秒)和 low data rate (低數據速率).驗證是否系統能夠跟上 data rate (數據速率), 可以檢查遇到的 end-to-end delay (端到端延遲)的值通過每個 processed batch (處理的批次)(在 Spark driver log4j 日誌中查找 “Total delay” , 或使用 StreamingListener 接口). 如果 delay (延遲)保持與 batch size (批量大小)相當, 那麽系統是穩定的.除此以外, 如果延遲不斷增加, 則意味著系統無法跟上, 因此不穩定.一旦你有一個 stable configuration (穩定的配置)的想法, 你可以嘗試增加 data rate and/or 減少 batch size .請註意, momentary increase (瞬時增加)由於延遲暫時增加只要延遲降低到 low value (低值), 臨時數據速率增加就可以很好(即, 小於 batch size (批量大小)).

Spark Performance Tuning (性能調優)