Spark Streaming原始碼解讀之Driver中的ReceiverTracker詳解
本篇博文的目標是:
Driver的ReceiverTracker接收到資料之後,下一步對資料是如何進行管理
一:ReceiverTracker的架構設計
1. Driver在Executor啟動Receiver方式,每個Receiver都封裝成一個Task,此時一個Job中就一個Task,而Task中就一條資料,也就是Receiver資料。由此,多少個Job也就可以啟動多少個Receiver.
2. ReceiverTracker在啟動Receiver的時候他有ReceiverSupervisor,其實現是ReceiverSupervisorImpl, ReceiverSupervisor本身啟動的時候會啟動Receiver,Receiver不斷的接收資料,通過BlockGenerator將資料轉換成Block。定時器會不斷的把Block資料通過BlockManager或者WAL進行儲存,資料儲存之後ReceiverSupervisorImpl會把儲存後的資料的元資料Metadate彙報給ReceiverTracker,其實是彙報給ReceiverTracker中的RPC實體ReceiverTrackerEndpoint.
ReceiverSupervisorImpl會將元資料彙報給ReceiverTracker,那麼接收到之後,下一步就對資料進行管理。
- 通過receivedBlockHandler寫資料
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
//WAL
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get )
} else {
//BlockManager
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
2. PushAndReportBlock儲存Block資料,且把資訊彙報給Driver。
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
3. 此時trackerEndpoint是ReceiverTrackerEndpoint
/** Remote RpcEndpointRef for the ReceiverTracker */
private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
4. ReceivedBlockInfo:封裝Block的儲存資訊。
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int, //block屬於哪個接收的流
numRecords: Option[Long],//多少條記錄
metadataOption: Option[Any],//元資料資訊
blockStoreResult: ReceivedBlockStoreResult
) {
require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId
def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}
/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid
/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}
5. ReceivedBlockStoreResult:
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
// Any implementation of this trait will store a block id
def blockId: StreamBlockId
// Any implementation of this trait will have to return the number of records
def numRecords: Option[Long]
}
ReceiverTracker的原始碼原始碼遍歷
1. 下面的訊息是完成Receiver和ReceiverTracker之間通訊的。
/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
* with each other.
*/
//這裡使用sealed意思是ReceiverTrackerMessage包含所有的訊息。
private[streaming] sealed trait ReceiverTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef
) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends ReceiverTrackerMessage
private[streaming] case class ReportError(streamId: Int, message: String, error: String)
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
extends ReceiverTrackerMessage
2. Driver和ReceiverTrackerEndpoint之間的交流通過ReceiverTrackerLocalMessage。
/**
* Messages used by the driver and ReceiverTrackerEndpoint to communicate locally.
*/
private[streaming] sealed trait ReceiverTrackerLocalMessage
3. ReceiverTrackerLocalMessage中的子類
/**
* This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver.
*/
//從起Receiver
private[streaming] case class RestartReceiver(receiver: Receiver[_])
extends ReceiverTrackerLocalMessage
/**
* This message is sent to ReceiverTrackerEndpoint when we start to launch Spark jobs for receivers
* at the first time.
*/
//啟動Receiver的集合
private[streaming] case class StartAllReceivers(receiver: Seq[Receiver[_]])
extends ReceiverTrackerLocalMessage
/**
* This message will trigger ReceiverTrackerEndpoint to send stop signals to all registered
* receivers.
*/
//程式結束的時候會發出停止所有Receiver的資訊。
private[streaming] case object StopAllReceivers extends ReceiverTrackerLocalMessage
/**
* A message used by ReceiverTracker to ask all receiver's ids still stored in
* ReceiverTrackerEndpoint.
*/
//正在存資訊的是ReceiverTrackerEndpoint
private[streaming] case object AllReceiverIds extends ReceiverTrackerLocalMessage
// UpdateReceiverRateLimit例項可能會有幾個,因此在程式執行的時候需要限流。
private[streaming] case class UpdateReceiverRateLimit(streamUID: Int, newRate: Long)
extends ReceiverTrackerLocalMessage
4. ReceiverTracker:管理Receiver的啟動,Receiver的執行,回收,執行過程中接收資料的管理。DStreamGraph中會有成員記錄所有的資料流來源,免得每次都去檢索。
/**
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
* this class must be created after all input streams have been added and StreamingContext.start()
* has been called because it needs the final set of input streams at the time of instantiation.
*
* @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
*/
private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
//所有的InputStream都會交給graph
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = new ReceivedBlockTracker(
ssc.sparkContext.conf,
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
private val listenerBus = ssc.scheduler.listenerBus
ReceiverTracker中的receiverAndReply:
- ReceiverTrackerEndpoint接收訊息,並回復addBlock訊息。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
// Local messages
//檢視是否有活躍的Receiver
case AllReceiverIds =>
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
//停止所有Receivers
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
}
2. addBlock原始碼如下:
/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
3. ReceiverBlockTracker的addBlock原始碼如下:把具體的一個Receiver彙報上來的資料的元資料資訊寫入streamIdToUnallocatedBlockQueues中。
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
其中getReceivedBlockQueue是ReceivedBlockQueue型別。
/** Get the queue of received blocks belonging to a particular stream */
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
4. 其中HashMap中第一個引數是StreamId,第二個引數ReceivedBlockQueue是StreamId對應接收到的Receiver.
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
5. WritetToLog原始碼如下:
/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) { //先判斷是否可以寫入到log中。
logTrace(s"Writing record: $record")
try {
//write方法將資料寫入 writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
false
}
} else {
true
}
}
ReceiverBlockTracker原始碼分析:
1. 保持跟蹤所有接收到的Block。並且根據需要把他們分配給batches.
假設提供checkpoint的話,ReceiverBlockTracker中的資訊包括receiver接收到的block資料和分配的資訊。Driver如果失敗的話,就讀取checkpoint中的資訊。
/**
* Class that keep track of all the received blocks, and allocate them to batches
* when required. All actions taken by this class can be saved to a write ahead log
* (if a checkpoint directory has been provided), so that the state of the tracker
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
*
* Note that when any instance of this class is created with a checkpoint directory,
* it will try reading events from logs in the directory.
*/
private[streaming] class ReceivedBlockTracker(
2. ReceivedBlockTracker通過呼叫allocateBlocksToBatch方法把接收到的資料分配給當前執行的Batch Duractions作業。
allocateBlocksToBatch被JobGenerator呼叫的。
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
// allocatedBlocks是接收到資料
// batchTime 是時間
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
//每次分配的時候都會更新時間
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
JobGenerator中的generateJob
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
//
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
3. AllocatedBlocks原始碼如下:
/** Class representing the blocks of all the streams allocated to a batch */
private[streaming]
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
}
}
ReceiverTracker的receive方法架構如下:
4. ReceiverTracker中receive原始碼如下:
override def receive: PartialFunction[Any, Unit] = {
// Local messages
//啟動所有的receivers,在ReceiverTracker剛啟動的時候會給自己發訊息,通過//schedulingPolicy來觸發訊息。
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
//當Executor幫我們分配Receiver或者Receiver失效,然後給自己發訊息觸發Receiver重新分發。
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledLocations)
//當我們快要完成資料計算的時候,會發送此訊息,將所有的Receiver交給我們
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
// ReceiverTracker可以動態的調整Receiver接收的RateLimit
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remote messages
//
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
5. 在ReceiverSupervisorImpl的receive方法中就接收到了ReceiverTracker的CleanupOldBlocks訊息。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
//根據時間就clean Old Block
cleanupOldBlocks(threshTime)
//
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}
}
})
6. RateLimiter中的updateRate原始碼如下:
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
}
7. 其中setRate原始碼如下:
/**
* Updates the stable rate of this {@code RateLimiter}, that is, the
* {@code permitsPerSecond} argument provided in the factory method that
* constructed the {@code RateLimiter}. Currently throttled threads will <b>not</b>
* be awakened as a result of this invocation, thus they do not observe the new rate;
* only subsequent requests will.
*
* <p>Note though that, since each request repays (by waiting, if necessary) the cost
* of the <i>previous</i> request, this means that the very next request
* after an invocation to {@code setRate} will not be affected by the new rate;
* it will pay the cost of the previous request, which is in terms of the previous rate.
*
* <p>The behavior of the {@code RateLimiter} is not modified in any other way,
* e.g. if the {@code RateLimiter} was configured with a warmup period of 20 seconds,
* it still has a warmup period of 20 seconds after this method invocation.
*
* @param permitsPerSecond the new stable rate of this {@code RateLimiter}.
*/
public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond > 0.0
&& !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex) {
resync(readSafeMicros());
double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
}
ReceiverTracker中receiveAndReply中StopAllReceivers流程如下:
1. stopReceivers原始碼如下:
/** Send stop signal to the receivers. */
private def stopReceivers() {
receiverTrackingInfos.values.flatMap(_.endpoint).foreach
//給ReceiverSupervisorImpl傳送訊息。
{ _.send(StopReceiver) }
logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers")
}
}
2. 在ReceiverSupervisorImpl中receive接收到了此訊息。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}
}
})
3. stop函式在ReceiverSupervisor中實現的。
/** Mark the supervisor and the receiver for stopping */
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
futureExecutionContext.shutdownNow()
stopLatch.countDown()
}
4. stopReceiver原始碼如下:
/** Stop receiver */
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState match {
case Initialized =>
logWarning("Skip stopping receiver because it has not yet stared")
case Started =>
receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
case Stopped =>
logWarning("Receiver has been stopped")
}
} catch {
case NonFatal(t) =>
logError("Error stopping receiver " + streamId + t.getStackTraceString)
}
}
5. 最終呼叫onStop方法
/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) setup in `onStart()` must be cleaned up in this method.
*/
def onStop()
6. onReceiverStop方法在子類ReceiverSupervisorImpl中會有具體實現。
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
//告訴Driver端也就是ReceiverTracker呼叫DeregisterReceiver
trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}
7. onStop方法在ReceiverSupervisorImpl中實現如下:
override protected def onStop(message: String, error: Option[Throwable]) {
registeredBlockGenerators.foreach { _.stop() }
//停止訊息迴圈
env.rpcEnv.stop(endpoint)
}
StopAllReceivers全流程如下:
總結:
Receiver接收到資料之後合併儲存資料後,ReceiverSupervisorImpl會把資料彙報給ReceiverTracker, ReceiverTracker接收到元資料,其內部彙報的是RPC通訊體,接收到資料之後,內部有ReceivedBlockTracker會管理資料的分配,JobGenerator會將每個Batch,每次工作的時候會根據元資料資訊從ReceiverTracker中獲取相應的元資料資訊生成RDD。
ReceiverBlockTracker中 allocateBlocksToBatch專門管理Block元資料資訊,作為一個內部的管理物件。
門面設計模式:
ReceiverTracker和ReceivedBlockTracker的關係是:具體幹活的是ReceivedBlockTracker,但是外部代表是ReceiverTracker。
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
//為每個Receiver單獨維護一個Queue
// streamIdToUnallocatedBlockQueues裡面封裝的是所有彙報上來的資料,但是沒有被分配的資料。
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
//維護的是已經分配到Batch的元資料資訊。
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
private val writeAheadLogOption = createWriteAheadLog()
private var lastAllocatedBatchTime: Time = null
JobGenerator在計算基於Batch的Job的時候,我們的DStreamGraph生成RDD的DAG的時候會呼叫此方法。
/** Get the blocks allocated to the given batch. */
//此方法就會生成RDD。
def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
}
當一個Batch計算完的時候,他會把已經使用的資料塊的資料資訊清理掉。
/**
* Clean up block information of old batches. If waitForCompletion is true, this method
* returns only after the files are cleaned up.
*/
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}