Spark Streaming基於案例詳解
阿新 • • 發佈:2019-02-16
本篇博文將從如下幾點組織文章:
一:案例演示
二:原始碼分析
一:案例演示
這裡只是貼出原始碼,後續會對改程式碼的實戰和實驗演示都會詳細的補充。
package com.dt.spark.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming+Spark SQL來線上動態計算電商中不同類別中最熱門的商品排名,例如手機這個類別下面最熱門的三種手機、電視這個類別
* 下最熱門的三種電視,該例項在實際生產環境下具有非常重大的意義;
* 實現技術:Spark Streaming+Spark SQL,之所以Spark Streaming能夠使用ML、sql、graphx等功能是因為有foreachRDD和Transform
* 等介面,這些介面中其實是基於RDD進行操作,所以以RDD為基石,就可以直接使用Spark其它所有的功能,就像直接呼叫API一樣簡單。
* 假設說這裡的資料的格式:user item category,例如Rocky Samsung Android
*/
object OnlineTheTop3ItemForEachCategory2DB {
def main(args: Array[String]){
/**
* 第1步:建立Spark的配置物件SparkConf,設定Spark程式的執行時的配置資訊,
* 例如說通過setMaster來設定程式要連結的Spark叢集的Master的URL,如果設定
* 為local,則代表Spark程式在本地執行,特別適合於機器配置條件非常差(例如
* 只有1G的記憶體)的初學者 *
*/
val conf = new SparkConf() //建立SparkConf物件
conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //設定應用程式的名稱,在程式執行的監控介面可以看到名稱
// conf.setMaster("spark://Master:7077") //此時,程式在Spark叢集
conf.setMaster("local[6]")
//設定batchDuration時間間隔來控制Job生成的頻率並且建立Spark Streaming執行的入口
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
val userClickLogsDStream = ssc.socketTextStream("Master", 9999)
val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))
// val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 + v2,
// (v1:Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))
val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
_-_, Seconds(60), Seconds(20))
categoryUserClickLogsDStream.foreachRDD { rdd => {
if (rdd.isEmpty()) {
println("No data inputted!!!")
} else {
val categoryItemRow = rdd.map(reducedItem => {
val category = reducedItem._1.split("_")(0)
val item = reducedItem._1.split("_")(1)
val click_count = reducedItem._2
Row(category, item, click_count)
})
val structType = StructType(Array(
StructField("category", StringType, true),
StructField("item", StringType, true),
StructField("click_count", IntegerType, true)
))
val hiveContext = new HiveContext(rdd.context)
val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)
categoryItemDF.registerTempTable("categoryItemTable")
val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
" OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
" WHERE rank <= 3")
reseltDataFram.show()
val resultRowRDD = reseltDataFram.rdd
resultRowRDD.foreachPartition { partitionOfRecords => {
if (partitionOfRecords.isEmpty){
println("This RDD is not null but partition is null")
} else {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
record.getAs("item") + "'," + record.getAs("click_count") + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
}
}
}
}
/**
* 在StreamingContext呼叫start方法的內部其實是會啟動JobScheduler的Start方法,進行訊息迴圈,在JobScheduler
* 的start內部會構造JobGenerator和ReceiverTacker,並且呼叫JobGenerator和ReceiverTacker的start方法:
* 1,JobGenerator啟動後會不斷的根據batchDuration生成一個個的Job
* 2,ReceiverTracker啟動後首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
* 資料後會通過ReceiverSupervisor儲存到Executor並且把資料的Metadata資訊傳送給Driver中的ReceiverTracker,在ReceiverTracker
* 內部會通過ReceivedBlockTracker來管理接收到的元資料資訊
* 每個BatchInterval會產生一個具體的Job,其實這裡的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
* 的DAG而已,從Java角度講,相當於Runnable介面例項,此時要想執行Job需要提交給JobScheduler,在JobScheduler中通過執行緒池的方式找到一個
* 單獨的執行緒來提交Job到叢集執行(其實是線上程中基於RDD的Action觸發真正的作業的執行),為什麼使用執行緒池呢?
* 1,作業不斷生成,所以為了提升效率,我們需要執行緒池;這和在Executor中通過執行緒池執行Task有異曲同工之妙;
* 2,有可能設定了Job的FAIR公平排程的方式,這個時候也需要多執行緒的支援;
*
*/
ssc.start()
ssc.awaitTermination()
}
}
二:原始碼分析
第一步:建立StreamingContext。
val ssc = new StreamingContext(conf, Seconds(5))
- StreamingContext原始碼如下:
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName a name for your job, to display on the cluster web UI
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
null, batchDuration)
}
2. 其中this裡面的第一個引數建立SparkContext,Spark Streaming就是Spark Core上面的一個應用程式。
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
第二步:獲取輸入資料來源
val userClickLogsDStream = ssc.socketTextStream("Master", 9999)
- socketTextStream接收socket資料流。
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
2. 建立SocketInputDStream例項。
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
3. 通過SocketReceiver接收資料。
private[streaming]
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
4. SocketReceiver中通過onstart方法呼叫receiver方法。
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
5. Receive方法通過網路連線,接收來自網路的資料。
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
//根據IP和埠
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
6. Receive接收到資料產生DStream,而DStream內部是以RDD的方式封裝資料。
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
socketTextStream讀取資料的呼叫過程如下:
第三步:根據自己的業務進行transformation操作。
第四步:呼叫start方法。
/**
* 在StreamingContext呼叫start方法的內部其實是會啟動JobScheduler的Start方法,進行訊息迴圈,在JobScheduler
* 的start內部會構造JobGenerator和ReceiverTacker,並且呼叫JobGenerator和ReceiverTacker的start方法:
* 1,JobGenerator啟動後會不斷的根據batchDuration生成一個個的Job
* 2,ReceiverTracker啟動後首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
* 資料後會通過ReceiverSupervisor儲存到Executor並且把資料的Metadata資訊傳送給Driver中的ReceiverTracker,在ReceiverTracker
* 內部會通過ReceivedBlockTracker來管理接受到的元資料資訊
* 每個BatchInterval會產生一個具體的Job,其實這裡的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
* 的DAG而已,從Java角度講,相當於Runnable介面例項,此時要想執行Job需要提交給JobScheduler,在JobScheduler中通過執行緒池的方式找到一個
* 單獨的執行緒來提交Job到叢集執行(其實是線上程中基於RDD的Action觸發真正的作業的執行),為什麼使用執行緒池呢?
* 1,作業不斷生成,所以為了提升效率,我們需要執行緒池;這和在Executor中通過執行緒池執行Task有異曲同工之妙;
* 2,有可能設定了Job的FAIR公平排程的方式,這個時候也需要多執行緒的支援;
*
*/
ssc.start()
- Start原始碼如下:
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
//執行緒本地儲存,執行緒有自己的私有屬性,設定這些執行緒的時候不會影響其他執行緒,
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
//呼叫JobScheduler的start方法。
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
//當有StreamingContext執行的時候就不許新的StreamingContext運行了,因為,//目前Spark還不支援多個SparkContext同時執行。
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
2. 追蹤JobScheduler的start方法原始碼如下:
JoScheduler的啟動主要實現以下步驟:
建立eventLoop的匿名類實現,主要是處理各類JobScheduler的事件。
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
//獲得inputDStream
inputDStream <- ssc.graph.getInputStreams
// rateController可以控制輸入速度
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
//啟動StreamingListenerBus,主要是用於更新Spark UI中的StreamTab的內容。
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
3. JobScheduler負責動態作業排程的具體類。
JobScheduler是整個Job的排程器,本身用了一條執行緒迴圈去監聽不同的Job啟動,Job完成或失敗等
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
4. 其中receiverTracker的start方法原始碼如下:
ReceiverTracker的作用是: 處理資料接收,資料快取,Block生成等工作。
ReceiverTracker是以傳送Job的方式到叢集中的Executor去啟動receiver。
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
5. ReceiverTrackEndpoint用於接收來自Receiver的訊息。
Receive接收訊息:啟動一個Job接收訊息。
/** RpcEndpoint to receive messages from the receivers. */
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
// TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
private val submitJobThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))
private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))
@volatile private var active: Boolean = true
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
//在那些機器上啟動executors
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remote messages
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
6. 呼叫startReceiver方法在Executors上啟動receiver.其中以封裝函式startReceiverFunc的方式啟動receiver.
/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
7. 在startReceiver方法內部會啟動supervisor.
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
8. 首先呼叫了onStart()方法,其實呼叫的是子類的onstart方法。
/**
* Called when supervisor is started.
* Note that this must be called before the receiver.onStart() is called to ensure
* things like [[BlockGenerator]]s are started before the receiver starts sending data.
*/
protected def onStart() { }
9. 也就是ReceiverSupervisorImpl的onStart方法。
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
10. BlockGenerator的start方法啟動了BlockIntervalTimer和BlockPushingThread.
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
11. 回到上面,我們現在看ReceiverSupervisor.startReceiver方法的呼叫。
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
12. 其中onReceiverStart方法在子類ReceiverSupervisorImpl的onReceiverStart,啟用給ReciverTrackEndpoint傳送registerReceiver訊息。
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
13. 此時,ReceiverTrackEndpoint接收到訊息後會呼叫registerReceiver方法。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
至此,ReceiverTrack的啟動就完成了。下面就回到我們最初的程式碼。
- JobScheduler的start方法:
receiverTracker.start()
jobGenerator.start()
2. 啟動JobGenerator,JobGenerator負責對DstreamGraph的初始化,DStream與RDD的轉換,生成Job,提交執行等工作。
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
// eventLoop用於接收JobGeneratorEvent訊息的通訊體。
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
3. 呼叫processEvent,以時間間隔發訊息。
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
4. generateJobs中發time就是我們指點的batch Duractions
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// batch時間間隔獲得Block資料。
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
// generateJobs生成Job
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//如果作業成功生成,那麼就提交這個作業。將作業提交給JobScheduler.
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
5. submitJobSet提交Job.
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
6. 而我們提交的Job,是被JobHandle封裝的。
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
try {
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
}
}
}
總體流程如下圖所示:
InputDStream繼承關係圖如下:
補充:
Spark執行的時候會啟動作業,runDummySparkJob函式是為了確保Receiver不會集中在一個節點上。
/**
* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
* receivers to be scheduled on the same node.
*
* TODO Should poll the executor number and wait for executors according to
* "spark.scheduler.minRegisteredResourcesRatio" and
* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
*/
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
//在資源沒有問題的前提下
//ReceiverTrackEndpoint => endpoint
endpoint.send(StartAllReceivers(receivers))
}