Spark Streaming效能優化: 如何在生產環境下動態應對流資料峰值
1、為什麼引入Backpressure
預設情況下,Spark Streaming通過Receiver以生產者生產資料的速率接收資料,計算過程中會出現batch processing time > batch interval的情況,其中batch processing time 為實際計算一個批次花費時間, batch interval為Streaming應用設定的批處理間隔。這意味著Spark Streaming的資料接收速率高於Spark從佇列中移除資料的速率,也就是資料處理能力低,在設定間隔內不能完全處理當前接收速率接收的資料。如果這種情況持續過長的時間,會造成資料在記憶體中堆積,導致Receiver所在Executor記憶體溢位等問題(如果設定StorageLevel包含disk, 則記憶體存放不下的資料會溢寫至disk, 加大延遲)。Spark 1.5以前版本,使用者如果要限制Receiver的資料接收速率,可以通過設定靜態配製引數“spark.streaming.receiver.maxRate
”的值來實現,此舉雖然可以通過限制接收速率,來適配當前的處理能力,防止記憶體溢位,但也會引入其它問題。比如:producer資料生產高於maxRate,當前叢集處理能力也高於maxRate,這就會造成資源利用率下降等問題。為了更好的協調資料接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態控制資料接收速率來適配叢集資料處理能力。
2、Backpressure
Spark Streaming Backpressure: 根據JobScheduler反饋作業的執行資訊來動態調整Receiver資料接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機制,預設值false,即不啟用。
附spark2.4.0官網截圖:
2.1 Streaming架構如下圖所示(詳見Streaming資料接收過程文件和Streaming 原始碼解析)
2.2 BackPressure執行過程如下圖所示:
在原架構的基礎上加上一個新的元件RateController,這個元件負責監聽“OnBatchCompleted”事件,然後從中抽取processingDelay 及schedulingDelay資訊. Estimator依據這些資訊估算出最大處理速度(rate),最後由基於Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉發給BlockGenerator(繼承自RateLimiter).
3、BackPressure 原始碼解析
3.1 RateController類體系
RatenController 繼承自StreamingListener. 用於處理BatchCompleted事件。核心程式碼為:
/** * A StreamingListener that receives batch completion updates, and maintains * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */ private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { init() protected def publish(rate: Long): Unit @transient implicit private var executionContext: ExecutionContext = _ @transient private var rateLimit: AtomicLong = _ /** * An initialization method called both from the constructor and Serialization code. */ private def init() { executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update")) rateLimit = new AtomicLong(-1L) } private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { ois.defaultReadObject() init() } /** * Compute the new rate limit and publish it asynchronously. */ private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } } def getLatestRate(): Long = rateLimit.get() override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) } }
3.2 RateController的註冊
JobScheduler啟動時會抽取在DStreamGraph中註冊的所有InputDstream中的rateController,並向ListenerBus註冊監聽. 此部分程式碼如下:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
3.3 BackPressure執行過程分析
BackPressure 執行過程分為BatchCompleted事件觸發和事件處理兩個過程
3.3.1 BatchCompleted觸發過程
對BatchedCompleted的分析,應該從JobGenerator入手,因為BatchedCompleted是批次處理結束的標誌,也就是JobGenerator產生的作業執行完成時觸發的,因此進行作業執行分析。
Streaming 應用中JobGenerator每個Batch Interval都會為應用中的每個Output Stream建立一個Job, 該批次中的所有Job組成一個Job Set.使用JobScheduler的submitJobSet進行批量Job提交。此部分程式碼結構如下所示:
/** Generate jobs and perform checkpointing for the given `time`. */
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
其中,sumitJobSet會建立固定數量的後臺執行緒(具體由“spark.streaming.concurrentJobs”指定),去處理Job Set中的Job. 具體實現邏輯為:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
其中JobHandler用於執行Job及處理Job執行結果資訊。當Job執行完成時會產生JobCompleted事件. JobHandler的具體邏輯如下面程式碼所示:
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
}
}
當Job執行完成時,向eventLoop傳送JobCompleted事件。EventLoop事件處理器接到JobCompleted事件後將呼叫handleJobCompletion 來處理Job完成事件。handleJobCompletion使用Job執行資訊建立StreamingListenerBatchCompleted事件並通過StreamingListenerBus向監聽器傳送。實現如下:
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
}
}
}
3.3.2、BatchCompleted事件處理過程
StreamingListenerBus將事件轉交給具體的StreamingListener,因此BatchCompleted將交由RateController進行處理。RateController接到BatchCompleted事件後將呼叫onBatchCompleted對事件進行處理。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}
onBatchCompleted會從完成的任務中抽取任務的執行延遲和排程延遲,然後用這兩個引數用RateEstimator(目前存在唯一實現PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate併發布。程式碼如下:
/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}
publish的功能為新生成的rate 藉助ReceiverTracker進行轉發。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint:
/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
ReceiverTrackerEndpoint接到訊息後,其將會從receiverTrackingInfos列表中獲取Receiver註冊時使用的endpoint(實為ReceiverSupervisorImpl),再將rate包裝成UpdateLimit傳送至endpoint.其接到資訊後,使用updateRate更新BlockGenerators(RateLimiter子類),來計算出一個固定的令牌間隔。
其中RateLimiter的updateRate實現如下:
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
setRate的實現如下:
/**
* Updates the stable rate of this {@code RateLimiter}, that is, the
* {@code permitsPerSecond} argument provided in the factory method that
* constructed the {@code RateLimiter}. Currently throttled threads will <b>not</b>
* be awakened as a result of this invocation, thus they do not observe the new rate;
* only subsequent requests will.
*
* <p>Note though that, since each request repays (by waiting, if necessary) the cost
* of the <i>previous</i> request, this means that the very next request
* after an invocation to {@code setRate} will not be affected by the new rate;
* it will pay the cost of the previous request, which is in terms of the previous rate.
*
* <p>The behavior of the {@code RateLimiter} is not modified in any other way,
* e.g. if the {@code RateLimiter} was configured with a warmup period of 20 seconds,
* it still has a warmup period of 20 seconds after this method invocation.
*
* @param permitsPerSecond the new stable rate of this {@code RateLimiter}.
*/
public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond > 0.0
&& !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex) {
resync(readSafeMicros());
double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
}
到此,backpressure反壓機制調整rate結束。