1. 程式人生 > >Spark中Task數量的分析

Spark中Task數量的分析

本文主要說一下Spark中Task相關概念、RDD計算時Task的數量、Spark Streaming計算時Task的數量。

Task作為Spark作業執行的最小單位,Task的數量及執行快慢間接決定了作業執行的快慢。


開始

先說明一下Spark作業的幾個核心概念:

Job(作業):Spark根據行動操作觸發提交作業,以行動操作將我們的程式碼切分為多個Job。

Stage(排程階段):每個Job中,又會根據寬依賴將Job劃分為多個Stage(包括ShuffleMapStage和ResultStage)。

Task(任務):真正執行計算的部分。Stage相當於TaskSet,每個Stage內部包含了多個Task,將各個Task下發到各個Executor執行計算。

每個Task的處理邏輯完全一樣,不同的是對應處理的資料。即:移動計算而不是移動資料。

Partition(分割槽):這個是針對RDD而言的,RDD內部維護了分割槽列表,表示資料在叢集中存放的不同位置。

Job、Stage、Task的對應關係如下:

Task是真正幹活的,所以說是它間接決定了Spark程式的快慢也不過分。


再看看Spark任務提交時的幾個相關配置:

num-executors:配置執行任務的Executor的數量。

executor-cores:每個Executor的核的數量。此核非彼核,它不是機器的CPU核,可以理解為Executor的一個執行緒。

每個核同時只可以執行一個Task。

也就是說一個Spark應用同時執行的任務數 = 用於執行任務的Executor數 * 每個Executor的核數。

spark.executor.memory:每個Executor的記憶體大小。

spark.default.parallelism:RDD的預設分割槽數。

在我們沒有指定這個引數的前提下,如果是shuffle操作,這個值預設是父RDD中分割槽數較大的那個值;如果是普通操作,這個值的預設大小取決於叢集管理器(YARN, Local這些)。

以YARN為例,如果我們沒有指定,它的大小就是所有用於執行任務的Executor核的總數。

spark.sql.shuffle.partitions

:這個配置是針對於Spark SQL在shuffle時的預設分割槽數。預設值是200。只對Spark SQL起作用。

RDD計算時Task的數量

在基於RDD計算時,Task的數量 = RDD的分割槽數。

所以調整RDD分割槽的數量就可以變相的調整Task的數量。

所以當RDD計算跑的很慢時,可以通過適當的調整RDD分割槽數來實現提速。

看看Spark.parallelize生成RDD時的原始碼實現:

def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

// 這裡的taskScheduler.defaultParallelism就是
// 取的配值中spark.default.parallelism的值。
def defaultParallelism: Int = {
    assertNotStopped()
        taskScheduler.defaultParallelism
}

可以發現通過Spark.parallelize建立的RDD分割槽,如果我們不指定分割槽數,那麼分割槽數就是由配置的spark.default.parallelism來決定。

Spark讀Hive、HDFS時的Task數量

這塊之後補上來。。。


Spark Streaming流處理時的Task數量

Spark Streaming作為Spark中用於流處理的一員,它的原理就是執行一個接收器接收資料,然後將接收的資料按塊進行儲存,之後劃分Job,執行Task處理資料。

ok,Spark Streaming最後也會轉換成Task進行資料的處理,也就是Task執行速度也會影響它處理資料的速度。


Spark Streaming中Task的數量是由用來儲存接收到資料的Block數來決定的。

那麼只要控制Block的數量就可以控制Task的數量。

如下程式碼所示,Block是由一個定時器定時生成的。

// 塊生成間隔時間
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
// 一個定時器,按塊生成間隔時間定時根據接收到的資料生成塊。
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

所以Block的數量 = 批處理間隔時間 / 塊生成間隔時間。

塊生成間隔時間是由配置spark.streaming.blockInterval決定的,預設是200ms,最小是50ms。

所以當Spark Streaming的Task數量成為效能的瓶頸時,可以通過調整引數來調整Task的數量。

總結

1、Task是Spark的最小執行單位,Executor每個核同時只能執行一個Task。

2、RDD計算時,Task數量與分割槽數對應;Spark Streaming中,Task數量由Block數決定。

3、根據分配的資源以及作業的執行情況,適當調整Task數量。

4、移動計算而不是移動資料。


end. 個人理解,如果偏差歡迎指正。



個人公眾號:碼農峰,定時推送行業資訊,持續釋出原創技術文章,歡迎大家關注