Spark中Task數量的分析
本文主要說一下Spark中Task相關概念、RDD計算時Task的數量、Spark Streaming計算時Task的數量。
Task作為Spark作業執行的最小單位,Task的數量及執行快慢間接決定了作業執行的快慢。
開始
先說明一下Spark作業的幾個核心概念:
Job(作業):Spark根據行動操作觸發提交作業,以行動操作
將我們的程式碼切分為多個Job。
Stage(排程階段):每個Job中,又會根據寬依賴
將Job劃分為多個Stage(包括ShuffleMapStage和ResultStage)。
Task(任務):真正執行計算的部分。Stage相當於TaskSet,每個Stage內部包含了多個Task,將各個Task下發到各個Executor執行計算。
每個Task的處理邏輯完全一樣,不同的是對應處理的資料。即:移動計算而不是移動資料。
Partition(分割槽):這個是針對RDD而言的,RDD內部維護了分割槽列表,表示資料在叢集中存放的不同位置。
Job、Stage、Task的對應關係如下:
Task是真正幹活的,所以說是它間接決定了Spark程式的快慢也不過分。
再看看Spark任務提交時的幾個相關配置:
num-executors
:配置執行任務的Executor的數量。
executor-cores
:每個Executor的核的數量。此核非彼核,它不是機器的CPU核,可以理解為Executor的一個執行緒。
每個核同時只可以執行一個Task。
也就是說一個Spark應用同時執行的任務數 = 用於執行任務的Executor數 * 每個Executor的核數。
spark.executor.memory
:每個Executor的記憶體大小。
spark.default.parallelism
:RDD的預設分割槽數。
在我們沒有指定這個引數的前提下,如果是shuffle操作,這個值預設是父RDD中分割槽數較大的那個值;如果是普通操作,這個值的預設大小取決於叢集管理器(YARN, Local這些)。
以YARN為例,如果我們沒有指定,它的大小就是所有用於執行任務的Executor核的總數。
spark.sql.shuffle.partitions
RDD計算時Task的數量
在基於RDD計算時,Task的數量 = RDD的分割槽數。
所以調整RDD分割槽的數量就可以變相的調整Task的數量。
所以當RDD計算跑的很慢時,可以通過適當的調整RDD分割槽數來實現提速。
看看Spark.parallelize
生成RDD時的原始碼實現:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
// 這裡的taskScheduler.defaultParallelism就是
// 取的配值中spark.default.parallelism的值。
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
可以發現通過Spark.parallelize
建立的RDD分割槽,如果我們不指定分割槽數,那麼分割槽數就是由配置的spark.default.parallelism
來決定。
Spark讀Hive、HDFS時的Task數量
這塊之後補上來。。。
Spark Streaming流處理時的Task數量
Spark Streaming作為Spark中用於流處理的一員,它的原理就是執行一個接收器接收資料,然後將接收的資料按塊進行儲存,之後劃分Job,執行Task處理資料。
ok,Spark Streaming最後也會轉換成Task進行資料的處理,也就是Task執行速度也會影響它處理資料的速度。
Spark Streaming中Task的數量是由用來儲存接收到資料的Block數來決定的。
那麼只要控制Block的數量就可以控制Task的數量。
如下程式碼所示,Block是由一個定時器定時生成的。
// 塊生成間隔時間
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
// 一個定時器,按塊生成間隔時間定時根據接收到的資料生成塊。
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
所以Block的數量 = 批處理間隔時間 / 塊生成間隔時間。
塊生成間隔時間是由配置spark.streaming.blockInterval
決定的,預設是200ms,最小是50ms。
所以當Spark Streaming的Task數量成為效能的瓶頸時,可以通過調整引數來調整Task的數量。
總結
1、Task是Spark的最小執行單位,Executor每個核同時只能執行一個Task。
2、RDD計算時,Task數量與分割槽數對應;Spark Streaming中,Task數量由Block數決定。
3、根據分配的資源以及作業的執行情況,適當調整Task數量。
4、移動計算而不是移動資料。
end. 個人理解,如果偏差歡迎指正。
個人公眾號:碼農峰,定時推送行業資訊,持續釋出原創技術文章,歡迎大家關注