1. 程式人生 > >spark的反壓與推測機制

spark的反壓與推測機制

反壓背景

在預設情況下,Spark Streaming 通過 receivers (或者是 Direct 方式) 以生產者生產資料的速率接收資料。當 batch processing time > batch interval 的時候,也就是每個批次資料處理的時間要比 Spark Streaming 批處理間隔時間長;越來越多的資料被接收,但是資料的處理速度沒有跟上,導致系統開始出現數據堆積,可能進一步導致 Executor 端出現 OOM 問題而出現失敗的情況。

而在 Spark 1.5 版本之前,為了解決這個問題,對於 Receiver-based 資料接收器,我們可以通過配置 spark.streaming.receiver.maxRate

 引數來限制每個 receiver 每秒最大可以接收的記錄的資料;對於 Direct Approach 的資料接收,我們可以通過配置 spark.streaming.kafka.maxRatePerPartition 引數來限制每次作業中每個 Kafka 分割槽最多讀取的記錄條數。spark.streaming.kafka.consumer.poll.ms:設定spark拉取kafka資料的等待時間,預設 512毫秒 太短需要設定,這種方法雖然可以通過限制接收速率,來適配當前的處理能力,但這種方式存在以下幾個問題:

  • 我們需要事先估計好叢集的處理速度以及訊息資料的產生速度;
  • 這兩種方式需要人工參與,修改完相關引數之後,我們需要手動重啟 Spark Streaming 應用程式;
  • 如果當前叢集的處理能力高於我們配置的 maxRate,而且 producer 產生的資料高於 maxRate,這會導致叢集資源利用率低下,而且也會導致資料不能夠及時處理。

Spark 資料堆積

Spark Streaming反壓機制的核心原理

簡單介紹:反壓機制核心功能就是控制spark程式最多能消費的資料量,保證資料不積壓

注意事項:一旦啟動反壓機制,設定spark.streaming.kafka.maxRatePerPartition kafka最大分割槽拉取數就失效了

Spark Streaming 反壓機制的使用

        在 Spark 啟用反壓機制很簡單,一般來說只需要將 spark.streaming.backpressure.enabled 設定為 true 即可,這個引數的預設值為 false。特殊情況反壓機制還涉及以下幾個引數,包括文件中沒有列出來的:

  • spark.streaming.backpressure.initialRate: 啟用反壓機制時每個接收器接收第一批資料的初始最大速率。預設值沒有設定。
  • spark.streaming.backpressure.rateEstimator:速率估算器類,預設值為 pid ,目前 Spark 只支援這個,大家可以根據自己的需要實現。
  • spark.streaming.backpressure.pid.proportional:用於響應錯誤的權重(最後批次和當前批次之間的更改)。預設值為1,只能設定成非負值。weight for response to "error" (change between last batch and this batch)
  • spark.streaming.backpressure.pid.integral:錯誤積累的響應權重,具有抑制作用(有效阻尼)。預設值為 0.2 ,只能設定成非負值。weight for the response to the accumulation of error. This has a dampening effect.
  • spark.streaming.backpressure.pid.derived:對錯誤趨勢的響應權重。 這可能會引起 batch size 的波動,可以幫助快速增加/減少容量。預設值為0,只能設定成非負值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.
  • spark.streaming.backpressure.pid.minRate:可以估算的最低費率是多少。預設值為 100,只能設定成非負值。

推測機制

spark推測式執行

            推測任務是指對於一個Stage裡面拖後腿的Task,會在其他節點的Executor上再次啟動這個task,如果其中一個Task例項執行成功則將這個最先完成的Task的計算結果作為最終結果,同時會幹掉其他Executor上執行的例項。spark推測式執行預設是關閉的,可通過spark.speculation屬性來開啟。

推測機制原理

           當成功的Task數超過總Task數的75%(可通過引數spark.speculation.quantile設定進行更改)時,再統計所有成功的Tasks的執行時間,得到一箇中位數,用這個中位數乘以1.5(可通過引數spark.speculation.multiplier控制進行更改)得到執行時間門限,如果在執行的Tasks的執行時間超過這個門限,則對它啟用推測。簡單來說就是對那些拖慢整體進度的Tasks啟用推測,以加速整個Stage的執行

// 資料出現某個task執行不完的情況,開啟推測機制,有可能是網路問題等等
		/*conf.set("spark.speculation.interval","100") // 檢測週期,單位為毫秒
		conf.set("spark.speculation.quantile","0.75") // 完成task的百分比時啟動推測
		conf.set("spark.speculation.multiplier","1.5") // 比其它的慢多少倍時啟動推測
		conf.set("spark.streaming.concurrentJobs","3") // 控制job併發度,啟動執行緒執行*/