1. 程式人生 > >《Spark 官方文件》Spark配置

《Spark 官方文件》Spark配置

Spark配置

Spark有以下三種方式修改配置:

  • Spark properties (Spark屬性)可以控制絕大多數應用程式引數,而且既可以通過 SparkConf 物件來設定,也可以通過Java系統屬性來設定。
  • Environment variables (環境變數)可以指定一些各個機器相關的設定,如IP地址,其設定方法是寫在每臺機器上的conf/spark-env.sh中。
  • Logging (日誌)可以通過log4j.properties配置日誌。

Spark屬性

Spark屬性可以控制大多數的應用程式設定,並且每個應用的設定都是分開的。這些屬性可以用SparkConf 物件直接設定。SparkConf為一些常用的屬性定製了專用方法(如,master URL和application name),其他屬性都可以用鍵值對做引數,呼叫set()方法來設定。例如,我們可以初始化一個包含2個本地執行緒的Spark應用,程式碼如下:

注意,local[2]代表2個本地執行緒 – 這是最小的併發方式,可以幫助我們發現一些只有在分散式上下文才能復現的bug。

val conf = new SparkConf()
             .setMaster("local[2]")
             .setAppName("CountingSheep")
val sc = new SparkContext(conf)

注意,本地模式下,我們可以使用n個執行緒(n >= 1)。而且在像Spark Streaming這樣的場景下,我們可能需要多個執行緒來防止類似執行緒餓死這樣的問題。

配置時間段的屬性應該寫明時間單位,如下格式都是可接受的:

25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)

配置大小的屬性也應該寫明單位,如下格式都是可接受的:

1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)

動態載入Spark屬性

在某些場景下,你可能需要避免將屬性值寫死在 SparkConf 中。例如,你可能希望在同一個應用上使用不同的master或不同的記憶體總量。Spark允許你簡單地建立一個空的SparkConf物件:

val sc = new SparkContext(new SparkConf())

然後在執行時設定這些屬性:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
  --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Spark shell和工具支援兩種動態載入配置的方法。第一種,通過命令列選項,如:上面提到的–master(設定master URL)。spark-submit可以在啟動Spark應用時,通過–conf標誌接受任何屬性配置,同時有一些特殊配置引數同樣可用(如,–master)。執行./bin/spark-submit –help可以展示這些選項的完整列表。

同時,bin/spark-submit 也支援從conf/spark-defaults.conf 中讀取配置選項,在該檔案中每行是一個鍵值對,並用空格分隔,如下:

spark.master            spark://5.6.7.8:7077
spark.executor.memory   4g
spark.eventLog.enabled  true
spark.serializer        org.apache.spark.serializer.KryoSerializer

這些通過引數或者屬性配置檔案傳遞的屬性,最終都會在SparkConf 中合併。其優先順序是:首先是SparkConf程式碼中寫的屬性值,其次是spark-submit或spark-shell的標誌引數,最後是spark-defaults.conf檔案中的屬性。

有一些配置項被重新命名過,這種情形下,老的名字仍然是可以接受的,只是優先順序比新名字優先順序低。

檢視Spark屬性

每個SparkContext都有其對應的Spark UI,所以Spark應用程式都能通過Spark UI檢視其屬性。預設你可以在這裡看到:http://<driver>:4040,頁面上的”Environment“ tab頁可以檢視Spark屬性。如果你真的想確認一下屬性設定是否正確的話,這個功能就非常有用了。注意,只有顯式地通過SparkConf物件、在命令列引數、或者spark-defaults.conf設定的引數才會出現在頁面上。其他屬性,你可以認為都是預設值。

可用的屬性

絕大多數屬性都有合理的預設值。這裡是部分常用的選項:

應用屬性

屬性名稱 預設值 含義
spark.app.name (none) Spark應用的名字。會在SparkUI和日誌中出現。
spark.driver.cores 1 在cluster模式下,用幾個core執行驅動器(driver)程序。
spark.driver.maxResultSize 1g Spark action運算元返回的結果最大多大。至少要1M,可以設為0表示無限制。如果結果超過這一大小,Spark作業(job)會直接中斷退出。但是,設得過高有可能導致驅動器OOM(out-of-memory)(取決於spark.driver.memory設定,以及驅動器JVM的記憶體限制)。設一個合理的值,以避免驅動器OOM。
spark.driver.memory 1g 驅動器程序可以用的記憶體總量(如:1g,2g)。
注意,在客戶端模式下,這個配置不能在SparkConf中直接設定(因為驅動器JVM都啟動完了呀!)。驅動器客戶端模式下,必須要在命令列裡用 –driver-memory 或者在預設屬性配置檔案裡設定。
spark.executor.memory 1g 單個執行器(executor)使用的記憶體總量(如,2g,8g)
spark.extraListeners (none) 逗號分隔的SparkListener子類的類名列表;初始化SparkContext時,這些類的例項會被創建出來,並且註冊到Spark的監聽總線上。如果這些類有一個接受SparkConf作為唯一引數的建構函式,那麼這個建構函式會被優先呼叫;否則,就呼叫無引數的預設建構函式。如果沒有建構函式,SparkContext建立的時候會拋異常。
spark.local.dir /tmp Spark的”草稿“目錄,包括map輸出的臨時檔案,或者RDD存在磁碟上的資料。這個目錄最好在本地檔案系統中,這樣讀寫速度快。這個配置可以接受一個逗號分隔的列表,通常用這種方式將檔案IO分散不同的磁碟上去。
注意:Spark-1.0及以後版本中,這個屬性會被叢集管理器所提供的環境變數覆蓋:SPARK_LOCAL_DIRS(獨立部署或Mesos)或者 LOCAL_DIRS(YARN)。
spark.logConf false SparkContext啟動時是否把生效的 SparkConf 屬性以INFO日誌列印到日誌裡
spark.master (none)

除了這些以外,以下還有很多可用的引數配置,在某些特定情形下,可能會用到:

執行時環境

屬性名 預設值 含義
spark.driver.extraClassPath (none) 額外的classpath,將插入到到驅動器的classpath開頭。
注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf 在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-class-path)或者在 conf/spark-defaults.conf 配置。
spark.driver.extraJavaOptions (none) 驅動器額外的JVM選項。如:GC設定或其他日誌引數。
注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-java-options)或者conf/spark-defaults.conf 配置。
spark.driver.extraLibraryPath (none) 啟動驅動器JVM時候指定的依賴庫路徑。
注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-library-path)或者conf/spark-defaults.conf 配置。
spark.driver.userClassPathFirst false (試驗性的:即未來不一定會支援該配置) 驅動器是否首選使用使用者指定的jars,而不是spark自身的。這個特性可以用來處理使用者依賴和spark本身依賴項之間的衝突。目前還是試驗性的,並且只能用在叢集模式下。
spark.executor.extraClassPath (none) 新增到執行器(executor)classpath開頭的classpath。主要為了向後相容老的spark版本,不推薦使用。
spark.executor.extraJavaOptions (none) 傳給執行器的額外JVM引數。如:GC設定或其他日誌設定等。注意,不能用這個來設定Spark屬性或者堆記憶體大小。Spark屬性應該用SparkConf物件,或者spark-defaults.conf檔案(會在spark-submit指令碼中使用)來配置。執行器堆記憶體大小應該用 spark.executor.memory配置。
spark.executor.extraLibraryPath (none) 啟動執行器JVM時使用的額外依賴庫路徑。
spark.executor.logs.rolling.maxRetainedFiles (none) Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.設定日誌檔案最大保留個數。老日誌檔案將被幹掉。預設禁用的。
spark.executor.logs.rolling.maxSize (none) 設定執行器日誌檔案大小上限。預設禁用的。
需要自動刪日誌請參考 spark.executor.logs.rolling.maxRetainedFiles.
spark.executor.logs.rolling.strategy (none) 執行器日誌滾動策略。預設禁用。
可接受的值有”time”(基於時間滾動) 或者 “size”(基於檔案大小滾動)。
time:將使用 spark.executor.logs.rolling.time.interval設定滾動時間間隔
size:將使用 spark.executor.logs.rolling.size.maxBytes設定檔案大小上限
spark.executor.logs.rolling.time.interval daily 設定執行器日誌滾動時間間隔。日誌滾動預設是禁用的。
可用的值有 “daily”, “hourly”, “minutely”,也可設為數字(則單位為秒)。
關於日誌自動清理,請參考 spark.executor.logs.rolling.maxRetainedFiles
spark.executor.userClassPathFirst false (試驗性的)與 spark.driver.userClassPathFirst類似,只不過這個引數將應用於執行器
spark.executorEnv.[EnvironmentVariableName] (none) 向執行器程序增加名為EnvironmentVariableName的環境變數。使用者可以指定多個來設定不同的環境變數。
spark.python.profile false 對Python worker啟用效能分析,效能分析結果會在sc.show_profile()中,或者在驅動器退出前展示。也可以用sc.dump_profiles(path)輸出到磁碟上。如果部分分析結果被手動展示過,那麼驅動器退出前就不再自動展示了。預設會使用pyspark.profiler.BasicProfiler,也可以自己傳一個profiler 類引數給SparkContext建構函式。
spark.python.profile.dump (none) 這個目錄是用來在驅動器退出前,dump效能分析結果。效能分析結果會按RDD分別dump。同時可以使用ptats.Stats()來裝載。如果制定了這個,那麼分析結果就不再自動展示。
spark.python.worker.memory 512m 聚合時每個python worker使用的記憶體總量,和JVM的記憶體字串格式相同(如,512m,2g)。如果聚合時使用的記憶體超過這個量,就將資料溢位到磁碟上。
spark.python.worker.reuse true 是否複用Python worker。如果是,則每個任務會啟動固定數量的Python worker,並且不需要fork() python程序。如果需要廣播的資料量很大,設為true能大大減少廣播資料量,因為需要廣播的程序數減少了。

混洗行為

屬性名 預設值 含義
spark.reducer.maxSizeInFlight 48m map任務輸出同時reduce任務獲取的最大記憶體佔用量。每個輸出需要建立buffer來接收,對於每個reduce任務來說,有一個固定的記憶體開銷上限,所以最好別設太大,除非你記憶體非常大。
spark.shuffle.compress true 是否壓縮map任務的輸出檔案。通常來說,壓縮是個好主意。使用的壓縮演算法取決於 spark.io.compression.codec
spark.shuffle.file.buffer 32k 每個混洗輸出流的記憶體buffer大小。這個buffer能減少混洗檔案的建立和磁碟定址。
spark.shuffle.io.maxRetries 3 (僅對netty)如果IO相關異常發生,重試次數(如果設為非0的話)。重試能是大量資料的混洗操作更加穩定,因為重試可以有效應對長GC暫停或者網路閃斷。
spark.shuffle.io.numConnectionsPerPeer 1 (僅netty)主機之間的連線是複用的,這樣可以減少大叢集中重複建立連線的次數。然而,有些叢集是機器少,磁碟多,這種叢集可以考慮增加這個引數值,以便充分利用所有磁碟併發效能。
spark.shuffle.io.preferDirectBufs true (僅netty)堆外快取可以有效減少垃圾回收和快取複製。對於堆外記憶體緊張的使用者來說,可以考慮禁用這個選項,以迫使netty所有記憶體都分配在堆上。
spark.shuffle.io.retryWait 5s (僅netty)混洗重試獲取資料的間隔時間。預設最大重試延遲是15秒,設定這個引數後,將變成maxRetries* retryWait。
spark.shuffle.manager sort 混洗資料的實現方式。可用的有”sort”和”hash“。基於排序(sort)的混洗記憶體利用率更高,並且從1.2開始已經是預設值了。
spark.shuffle.service.enabled false 啟用外部混洗服務。啟用外部混洗服務後,執行器生成的混洗中間檔案就由該服務保留,這樣執行器就可以安全的退出了。如果 spark.dynamicAllocation.enabled啟用了,那麼這個引數也必須啟用,這樣動態分配才能有外部混洗服務可用。
更多請參考:dynamic allocation configuration and setup documentation
spark.shuffle.service.port 7337 外部混洗服務對應埠
spark.shuffle.sort.bypassMergeThreshold 200 (高階)在基於排序(sort)的混洗管理器中,如果沒有map端聚合的話,就會最多存在這麼多個reduce分割槽。
spark.shuffle.spill.compress true 是否在混洗階段壓縮溢位到磁碟的資料。壓縮演算法取決於spark.io.compression.codec

Spark UI

屬性名 預設值 含義
spark.eventLog.compress false 是否壓縮事件日誌(當然spark.eventLog.enabled必須開啟)
spark.eventLog.dir file:///tmp/spark-events Spark events日誌的基礎目錄(當然spark.eventLog.enabled必須開啟)。在這個目錄中,spark會給每個應用建立一個單獨的子目錄,然後把應用的events log打到子目錄裡。使用者可以設定一個統一的位置(比如一個HDFS目錄),這樣history server就可以從這裡讀取歷史檔案。
spark.eventLog.enabled false 是否啟用Spark事件日誌。如果Spark應用結束後,仍需要在SparkUI上檢視其狀態,必須啟用這個。
spark.ui.killEnabled true 允許從SparkUI上殺掉stage以及對應的作業(job)
spark.ui.port 4040 SparkUI埠,展示應用程式執行狀態。
spark.ui.retainedJobs 1000 SparkUI和status API最多保留多少個spark作業的資料(當然是在垃圾回收之前)
spark.ui.retainedStages 1000 SparkUI和status API最多保留多少個spark步驟(stage)的資料(當然是在垃圾回收之前)
spark.worker.ui.retainedExecutors 1000 SparkUI和status API最多保留多少個已結束的執行器(executor)的資料(當然是在垃圾回收之前)
spark.worker.ui.retainedDrivers 1000 SparkUI和status API最多保留多少個已結束的驅動器(driver)的資料(當然是在垃圾回收之前)
spark.sql.ui.retainedExecutions 1000 SparkUI和status API最多保留多少個已結束的執行計劃(execution)的資料(當然是在垃圾回收之前)
spark.streaming.ui.retainedBatches 1000 SparkUI和status API最多保留多少個已結束的批量(batch)的資料(當然是在垃圾回收之前)

壓縮和序列化

屬性名 預設值 含義
spark.broadcast.compress true 是否在廣播變數前使用壓縮。通常是個好主意。
spark.closure.serializer org.apache.spark.serializer.
JavaSerializer
閉包所使用的序列化類。目前只支援Java序列化。
spark.io.compression.codec snappy 內部資料使用的壓縮演算法,如:RDD分割槽、廣播變數、混洗輸出。Spark提供了3中演算法:lz4,lzf,snappy。你也可以使用全名來指定壓縮演算法:org.apache.spark.io.LZ4CompressionCodec,
org.apache.spark.io.LZFCompressionCodec,
org.apache.spark.io.SnappyCompressionCodec.
spark.io.compression.lz4.blockSize 32k LZ4演算法使用的塊大小。當然你需要先使用LZ4壓縮。減少塊大小可以減少混洗時LZ4算法佔用的記憶體量。
spark.io.compression.snappy.blockSize 32k Snappy演算法使用的塊大小(先得使用Snappy演算法)。減少塊大小可以減少混洗時Snappy算法佔用的記憶體量。
spark.kryo.classesToRegister (none) 如果你使用Kryo序列化,最好指定這個以提高效能(tuning guide)。
本引數接受一個逗號分隔的類名列表,這些類都會註冊為Kryo可序列化型別。
spark.kryo.referenceTracking true (false when using Spark SQL Thrift Server) 是否跟蹤同一物件在Kryo序列化的引用。如果你的物件圖中有迴圈護著包含統一物件的多份拷貝,那麼最好啟用這個。如果沒有這種情況,那就禁用以提高效能。
spark.kryo.registrationRequired false Kryo序列化時,是否必須事先註冊。如果設為true,那麼Kryo遇到沒有註冊過的型別,就會拋異常。如果設為false(預設)Kryo會序列化未註冊型別的物件,但會有比較明顯的效能影響,所以啟用這個選項,可以強制必須在序列化前,註冊可序列化型別。
spark.kryo.registrator (none) 如果你使用Kryo序列化,用這個class來註冊你的自定義型別。如果你需要自定義註冊方式,這個引數很有用。否則,使用 spark.kryo.classesRegister更簡單。要設定這個引數,需要用tuning guide 。
spark.kryoserializer.buffer.max 64m 最大允許的Kryo序列化buffer。必須必你所需要序列化的物件要大。如果你在Kryo中看到”buffer limit exceeded”這個異常,你就得增加這個值了。
spark.kryoserializer.buffer 64k Kryo序列化的初始buffer大小。注意,每臺worker上對應每個core會有一個buffer。buffer最大增長到 spark.kryoserializer.buffer.max
spark.rdd.compress false 是否壓縮序列化後RDD的分割槽(如:StorageLevel.MEMORY_ONLY_SER)。能節省大量空間,但多消耗一些CPU。
spark.serializer org.apache.spark.serializer.
JavaSerializer (org.apache.spark.serializer.
KryoSerializer when using Spark SQL Thrift Server)
spark.serializer.objectStreamReset 100 如果使用org.apache.spark.serializer.JavaSerializer做序列化器,序列化器快取這些物件,以避免輸出多餘資料,然而,這個會打斷垃圾回收。通過呼叫reset來flush序列化器,從而使老物件被回收。要禁用這一週期性reset,需要把這個引數設為-1,。預設情況下,序列化器會每過100個物件,被reset一次。

記憶體管理

屬性名 預設值 含義
spark.memory.fraction 0.75 堆記憶體中用於執行、混洗和儲存(快取)的比例。這個值越低,則執行中溢位到磁碟越頻繁,同時快取被逐出記憶體也更頻繁。這個配置的目的,是為了留出使用者自定義資料結構、內部元資料使用的記憶體。推薦使用預設值。請參考this description.
spark.memory.storageFraction 0.5 不會被逐出記憶體的總量,表示一個相對於 spark.memory.fraction的比例。這個越高,那麼執行混洗等操作用的記憶體就越少,從而溢位磁碟就越頻繁。推薦使用預設值。更詳細請參考 this description.
spark.memory.offHeap.enabled true 如果true,Spark會嘗試使用堆外記憶體。啟用 後,spark.memory.offHeap.size必須為正數。
spark.memory.offHeap.size 0 堆外記憶體分配的大小(絕對值)。這個設定不會影響堆記憶體的使用,所以你的執行器總記憶體必須適應JVM的堆記憶體大小。必須要設為正數。並且前提是 spark.memory.offHeap.enabled=true.
spark.memory.useLegacyMode false 是否使用老式的記憶體管理模式(1.5以及之前)。老模式在堆記憶體管理上更死板,使用固定劃分的區域做不同功能,潛在的會導致過多的資料溢位到磁碟(如果不小心調整效能)。必須啟用本引數,以下選項才可用:
spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction 0.2 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
混洗階段用於聚合和協同分組的JVM堆記憶體比例。在任何指定的時間,所有用於混洗的記憶體總和不會超過這個上限,超出的部分會溢位到磁碟上。如果溢位臺頻繁,考慮增加spark.storage.memoryFraction的大小。
spark.storage.memoryFraction 0.6 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
Spark用於快取資料的對記憶體比例。這個值不應該比JVM 老生代(old generation)物件所佔用的記憶體大,預設是60%的堆記憶體,當然你可以增加這個值,同時配置你所用的老生代物件佔用記憶體大小。
spark.storage.unrollFraction 0.2 (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。
Spark塊展開的記憶體佔用比例。如果沒有足夠的記憶體來完整展開新的塊,那麼老的塊將被拋棄。

執行行為

屬性名 預設值 含義
spark.broadcast.blockSize 4m TorrentBroadcastFactory每個分片大小。太大會減少廣播時候的併發數(更慢了);如果太小,BlockManager可能會給出效能提示。
spark.broadcast.factory org.apache.spark.broadcast.
TorrentBroadcastFactory
廣播演算法的實現。
spark.cleaner.ttl (infinite) Spark記住任意元資料的保留時間(秒)。週期性的清理能保證比這個更老的元資料將被遺忘(刪除)。這對於長期執行的Spark作業非常有用(如,一些7*24執行)。注意,RDD持久化到記憶體中後,過了這麼長時間以後,也會被清理掉(這。。。是不是有點坑!)。
spark.executor.cores YARN模式下預設1;如果是獨立部署,則是worker節點上所有可用的core。 單個執行器可用的core數。僅針對YARN和獨立部署模式。獨立部署時,單個worker節點上會執行多個執行器(executor),只要worker上有足夠的core。否則,每個應用在單個worker上只會啟動一個執行器。
spark.default.parallelism 對於reduceByKey和join這樣的分散式混洗(shuffle)運算元,等於父RDD中最大的分割槽。對於parallelize這樣沒有父RDD的運算元,則取決於叢集管理器:
  • Local mode: number of cores on the local machine — 本地模式:機器的core數
  • Mesos fine grained mode: 8 — Mesos細粒度模式:8
  • Others: total number of cores on all executor nodes or 2, whichever is larger — 其他:所有執行器節點上core的數量 或者 2,這兩數取較大的
如果使用者沒有在引數裡指定,這個屬性是預設的RDD transformation運算元分割槽數,如:join,reduceByKey,parallelize等。
spark.executor.heartbeatInterval 10s 執行器心跳間隔(報告心跳給驅動器)。心跳機制使驅動器瞭解哪些執行器還活著,並且可以從心跳資料中獲得執行器的度量資料。
spark.files.fetchTimeout 60s 獲取檔案的通訊超時,所獲取的檔案是通過在驅動器上呼叫SparkContext.addFile()新增的。
spark.files.useFetchCache true 如果設為true(預設),則同一個spark應用的不同執行器之間,會使用一二共享快取來拉取檔案,這樣可以提升同一主機上執行多個執行器時候,任務啟動的效能。如果設為false,這個優化就被禁用,各個執行器將使用自己獨有的快取,他們拉取的檔案也是各自有一份拷貝。如果在NFS檔案系統上使用本地檔案系統,可以禁用掉這個優化(參考SPARK-6313
spark.files.overwrite false SparkContext.addFile()新增的檔案已經存在,且內容不匹配的情況下,是否覆蓋。
spark.hadoop.cloneConf false 如設為true,對每個任務複製一份Hadoop Configuration物件。啟用這個可以繞過Configuration執行緒安全問題(SPARK-2546 )。預設這個是禁用的,很多job並不會受這個issue的影響。
spark.hadoop.validateOutputSpecs true 如設為true,在saveAsHadoopFile及其變體的時候,將會驗證輸出(例如,檢查輸出目錄是否存在)。對於已經驗證過或確認存在輸出目錄的情況,可以禁用這個。我們建議不要禁用,除非你確定需要和之前的spark版本相容。可以簡單的利用Hadoop 檔案系統API手動刪掉已存在的輸出目錄。這個設定會被Spark Streaming StreamingContext生成的job忽略,因為Streaming需要在回覆檢查點的時候,覆蓋已有的輸出目錄。
spark.storage.memoryMapThreshold 2m spark從磁碟上讀取一個塊後,對映到記憶體塊的最小大小。這阻止了spark對映過小的記憶體塊。通常,記憶體對映塊是有開銷的,應該比接近或小於作業系統的頁大小。
spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 用於儲存RDD的外部塊管理器(檔案系統)的實現。
檔案系統URL由spark.externalBlockStore.url決定。
spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部塊儲存存放RDD的目錄。檔案系統URL由spark.externalBlockStore.url決定。也可以是逗號分隔的目錄列表(Tachyon檔案系統)
spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 所使用的外部塊儲存檔案系統URL。

網路

屬性名 預設值 含義
spark.akka.frameSize 128 “control plane” 通訊中所允許的最大訊息大小(MB)。通常,只應用於map輸出資料的大小資訊,這些資訊會在執行器和驅動器之間傳遞。如果你的job包含幾千個map和reduce任務,你可能需要增大這個設定。
spark.akka.heartbeat.interval 1000s 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。設成較大的值可以減少網路開銷,而較小的值(1秒左右)可能會對Akka的失敗檢測更有用。如有需要,可以調整這個值和spark.akka.heartbeat.pauses的組合。一種可能需要使用失敗檢測的情形是:用一個敏感的失敗檢測,可以快速識別並逐出不穩定的執行器。然而,在真實的spark叢集中,這通常不是GC暫停或網路延遲造成的。除此之外,啟用這個還會導致過多的心跳資料交換,從而造成網路洪峰。
spark.akka.heartbeat.pauses 6000s 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。這個是可接受的Akka心跳暫停時間。這個可以用來控制對GC暫停敏感程度。如有需要,可以調整這個值和spark.akka.heartbeat.interval的組合。
spark.akka.threads 4 用於通訊的actor執行緒數。如果驅動器機器上有很多CPU core,你可以適當增大這個值。
spark.akka.timeout 100s Spark節點之間通訊超時。
spark.blockManager.port (random) 塊管理器(block manager)監聽埠。在驅動器和執行器上都有。
spark.broadcast.port (random) 驅動器HTTP廣播server監聽埠。這和torrent廣播沒有關係。
spark.driver.host (local hostname) 驅動器主機名。用於和執行器以及獨立部署時叢集master通訊。
spark.driver.port (random) 驅動器埠。用於和執行器以及獨立部署時叢集master通訊。
spark.executor.port (random) 執行器埠。用於和驅動器通訊。
spark.fileserver.port (random) 驅動器HTTP檔案server監聽埠。
spark.network.timeout 120s 所有網路互動的預設超時。這個配置是以下屬性的預設值:
spark.core.connection.ack.wait.timeout,
spark.akka.timeout,
spark.storage.blockManagerSlaveTimeoutMs,
spark.shuffle.io.connectionTimeout,spark.rpc.askTimeout or
spark.rpc.lookupTimeout
spark.port.maxRetries 16 繫結一個埠的最大重試次數。如果指定了一個埠(非0),每個後續重試會在之前嘗試的埠基礎上加1,然後再重試繫結。本質上,這確定了一個繫結埠的範圍,就是 [start port, start port + maxRetries]
spark.replClassServer.port (random) 驅動器HTTP class server的監聽埠。只和spark shell相關。
spark.rpc.numRetries 3 RPC任務最大重試次數。RPC任務最多重試這麼多次。
spark.rpc.retry.wait 3s RPC請求操作重試前等待時間。
spark.rpc.askTimeout 120s RPC請求操作超時等待時間。
spark.rpc.lookupTimeout 120s RPC遠端端點查詢超時。

排程

屬性名 預設值 含義
spark.cores.max (not set) 如果執行在獨立部署叢集模式(standalone deploy cluster)或者Mesos叢集粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),這個值決定了spark應用可以使用的最大CPU總數(應用在整個叢集中可用CPU總數,而不是單個機器)。如果不設定,那麼獨立部署時預設為spark.deploy.defaultCores,Mesos叢集則預設無限制(即所有可用的CPU)。
spark.locality.wait 3s 為了資料本地性最長等待時間(spark會根據資料所在位置,儘量讓任務也啟動於相同的節點,然而可能因為該節點上資源不足等原因,無法滿足這個任務分配,spark最多等待這麼多時間,然後放棄資料本地性)。資料本地性有多個級別,每一級別都是等待這麼多時間(同一程序、同一節點、同一機架、任意)。你也可以為每個級別定義不同的等待時間,需要設定spark.locality.wait.node等。如果你發現任務資料本地性不佳,可以增加這個值,但通常預設值是ok的。
spark.locality.wait.node spark.locality.wait 單獨定義同一節點資料本地性任務等待時間。你可以設為0,表示忽略節點本地性,直接跳到下一級別,即機架本地性(如果你的叢集有機架資訊)。
spark.locality.wait.process spark.locality.wait 單獨定義同一程序資料本地性任務等待時間。這個引數影響試圖訪問特定執行器上的快取資料的任務。
spark.locality.wait.rack spark.locality.wait 單獨定義同一機架資料本地性等待時間。
spark.scheduler.maxRegisteredResourcesWaitingTime 30s 排程開始前,向叢集管理器註冊使用資源的最大等待時間。
spark.scheduler.minRegisteredResourcesRatio 0.8 for YARN mode;
0.0 for standalone mode and Mesos coarse-grained mode
排程啟動前,需要註冊得到資源的最小比例(註冊到的資源數 / 需要資源總數)(YARN模式下,資源是執行器;獨立部署和Mesos粗粒度模式下時資源是CPU core【spark.cores.max是期望得到的資源總數】)。可以設為0.0~1.0的一個浮點數。不管job是否得到了最小資源比例,最大等待時間都是由spark.scheduler.maxRegisteredResourcesWaitingTime控制的。
spark.scheduler.mode FIFO 提交到同一個SparkContext上job的排程模式(scheduling mode)。另一個可接受的值是FAIR,而FIFO只是簡單的把job按先來後到排隊。對於多使用者服務很有用。
spark.scheduler.revive.interval 1s 排程器復活worker的間隔時間。
spark.speculation false 如果設為true,將會啟動推測執行任務。這意味著,如果stage中有任務執行較慢,他們會被重新排程到別的節點上執行。
spark.speculation.interval 100ms Spark檢查慢任務的時間間隔。
spark.speculation.multiplier 1.5 比任務平均執行時間慢多少倍的任務會被認為是慢任務。
spark.speculation.quantile 0.75 對於一個stage來說,完成多少百分比才開始檢查慢任務,並啟動推測執行任務。
spark.task.cpus 1 每個任務分配的CPU core。
spark.task.maxFailures 4 單個任務最大失敗次數。應該>=1。最大重試次數 = spark.task.maxFailures – 1

動態分配

屬性名 預設值 含義
spark.dynamicAllocation.enabled false 是否啟用動態資源分配特性,啟用後,執行器的個數會根據工作負載動態的調整(增加或減少)。注意,目前在YARN模式下不用。更詳細資訊,請參考:here該特性依賴於 spark.shuffle.service.enabled 的啟用。同時還和以下配置相關:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及 spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation
.executorIdleTimeout
60s 動態分配特性啟用後,空閒時間超過該配置時間的執行器都會被移除。更詳細請參考這裡:description
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 動態分配特性啟用後,包含快取資料的執行器如果空閒時間超過該配置設定的時間,則被移除。更詳細請參考:description
spark.dynamicAllocation.initialExecutors spark
.dynamicAllocation
.minExecutors
動態分配開啟後,執行器的初始個數
spark.dynamicAllocation.maxExecutors infinity 動態分配開啟後,執行器個數的上限
spark.dynamicAllocation.minExecutors 0 動態分配開啟後,執行器個數的下限
spark.dynamicAllocation.schedulerBacklogTimeout 1s 動態分配啟用後,如果有任務積壓的持續時間長於該配置設定的時間,則申請新的執行器。更詳細請參考:description
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 和spark.dynamicAllocation.schedulerBacklogTimeout類似,只不過該配置對應於隨後持續的執行器申請。更詳細請參考: description

安全

屬性名 預設值 含義
spark.acls.enable false 是否啟用Spark acls(訪問控制列表)。如果啟用,那麼將會檢查使用者是否有許可權檢視或修改某個作業(job)。注意,檢查的前提是需要知道使用者是誰,所以如果使用者是null,則不會做任何檢查。你可以在Spark UI上設定過濾器(Filters)來做使用者認證,並設定使用者名稱。
spark.admin.acls Empty 逗號分隔的使用者列表,在該列表中的使用者/管理員將能夠訪問和修改所有的Spark作業(job)。如果你的叢集是共享的,並且有叢集管理員,還有需要除錯的開發人員,那麼這個配置會很有用。如果想讓所有人都有管理員許可權,只需把該配置設定為”*”
spark.authenticate false 設定Spark是否認證叢集內部連線。如果不是在YARN上執行,請參考 spark.authenticate.secret
spark.authenticate.secret None 設定Spark用於內部元件認證的祕鑰。如果不是在YARN上執行,且啟用了 spark.authenticate,那麼該配置必須設定
spark.authenticate.enableSaslEncryption false 是否對Spark內部元件認證使用加密通訊。該配置目前只有 block transfer service 使用。
spark.network.sasl.serverAlwaysEncrypt false 是否對支援SASL認證的service禁用非加密通訊。該配置目前只有 external shuffle service 支援。
spark.core.connection.ack.wait.timeout 60s 網路連線等待應答訊號的超時時間。為了避免由於GC等導致的意外超時,你可以設定一個較大的值。
spark.core.connection.auth.wait.timeout 30s 網路連線等待認證的超時時間。
spark.modify.acls Empty 逗號分隔的使用者列表,在改列表中的使用者可以修改Spark作業。預設情況下,只有啟動該Spark作業的使用者可以修改之(比如殺死該作業)。如果想要任何使用者都可以修改作業,請將該配置設定為”*”
spark.ui.filters None 逗號分隔的過濾器class列表,這些過濾器將用於Spark web UI。這裡的過濾器應該是一個標準的javax servlet Filter 。每個過濾器的引數可以通過java系統屬性來設定,如下:
spark.<class name of filer>.params=’param1=value1,param2=value2’例如:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params=’param1=foo,param2=testing’
spark.ui.view.acls Empty 逗號分隔的使用者列表,在該列表中的使用者可以檢視Spark web UI。預設,只有啟動該Spark作業的使用者可以檢視之。如果需要讓所有使用者都能檢視,只需將該配置設為”*”

加密

屬性名 預設值 含義
spark.ssl.enabled false 是否啟用SSL連線(在所有所支援的協議上)。所有SSL相關配置(spark.ssl.xxx,其中xxx是一個特定的配置屬性),都是全域性的。如果需要在某些協議上覆蓋全域性設定,那麼需要在該協議名稱空間上進行單獨配置。使用 spark.ssl.YYY.XXX 來為協議YYY覆蓋全域性配置XXX。目前YYY的可選值有 akka(用於基於AKKA框架的網路連線) 和 fs(用於應廣播和檔案伺服器)
spark.ssl.enabledAlgorithms Empty 逗號分隔的加密演算法列表。這些加密演算法必須是JVM所支援的。這裡有個可用加密演算法參考列表: this
spark.ssl.keyPassword None 在key-store中私匙對應的密碼。
spark.ssl.keyStore None key-store檔案路徑。可以是絕對路徑,或者以本元件啟動的工作目錄為基礎的相對路徑。
spark.ssl.keyStorePassword None key-store的密碼。
spark.ssl.protocol None 協議名稱。該協議必須是JVM所支援的。這裡有JVM支援的協議參考列表:this
spark.ssl.trustStore None trust-store檔案路徑。可以是絕對路徑,或者以本元件啟動的工作目錄為基礎的相對路徑。
spark.ssl.trustStorePassword None trust-store的密碼

Spark Streaming [流式]

屬性名 預設值 含義
spark.streaming.backpressure.enabled false 是否啟用Spark Streaming 的內部反壓機制(spark 1.5以上支援)。啟用後,Spark Streaming會根據當前批次的排程延遲和處理時長來控制接收速率,這樣一來,系統的接收速度會和處理速度相匹配。該特性會在內部動態地設定接收速率。該速率的上限將由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 決定(如果它們設定了的話)。
spark.streaming.blockInterval 200ms 在將資料儲存到Spark之前,Spark Streaming接收器組裝資料塊的時間間隔。建議不少於50ms。關於Spark Streaming程式設計指南細節,請參考 performance tuning 這一節。
spark.streaming.receiver.maxRate not set 接收速度的最大速率(每秒記錄條數)。實際上,每個流每秒將消費這麼多條記錄。設定為0或者負數表示不限制速率。更多細節請參考: deployment guide
spark.streaming.receiver.writeAheadLog.enable false 是否啟用接收器預寫日誌。所有的輸入資料都會儲存到預寫日誌中,這樣在驅動器失敗後,可以基於預寫日誌來恢復資料。更詳細請參考:deployment guide
spark.streaming.unpersist true 是否強制Spark Streaming 自動從記憶體中清理掉所生成並持久化的RDD。同時,Spark Streaming收到的原始資料也將會被自動清理掉。如果設定為false,那麼原始資料以及持久化的RDD將不會被自動清理,以便外部程式可以訪問這些資料。當然,這將導致Spark消耗更多的記憶體。
spark.streaming.stopGracefullyOnShutdown false 如果設為true,Spark將會在JVM關閉時,優雅地關停StreamingContext,而不是立即關閉之。
spark.streaming.kafka.maxRatePerPartition not set 在使用Kafka direct stream API時,從每個Kafka資料分割槽讀取資料的最大速率(每秒記錄條數)。更詳細請參考:Kafka Integration guide
spark.streaming.kafka.maxRetries 1 驅動器連續重試的最大次數,這個配置是為了讓驅動器找出每個Kafka分割槽上的最大offset(預設值為1,意味著驅動器將最多嘗試2次)。只對新的Kafka direct stream API有效。
spark.streaming.ui.retainedBatches 1000 Spark Streaming UI 以及 status API 中保留的最大批次個數。

SparkR

屬性名 預設值 含義
spark.r.numRBackendThreads 2 SparkR RBackEnd處理RPC呼叫的後臺執行緒數
spark.r.command Rscript 叢集模式下,驅動器和worker上執行的R指令碼可執行檔案
spark.r.driver.command spark.r.command client模式的驅動器執行的R指令碼。叢集模式下會忽略

叢集管理器

每個叢集管理器都有一些額外的配置選項。詳細請參考這裡:

YARN
Mesos

環境變數

有些Spark設定需要通過環境變數來設定,這些環境變數可以在${SPARK_HOME}/conf/spark-env.sh指令碼(Windows下是conf/spark-env.cmd)中設定。如果是獨立部署或者Mesos模式,這個檔案可以指定機器相關資訊(如hostname)。執行本地Spark應用或者submit指令碼時,也會引用這個檔案。

注意,conf/spark-env.sh預設是不存在的。你需要複製conf/spark-env.sh.template這個模板來建立,還有注意給這個檔案附上可執行許可權。

以下變數可以在spark-env.sh中設定:

環境變數 含義
JAVA_HOME Java安裝目錄(如果沒有在PATH變數中指定)
PYSPARK_PYTHON 驅動器和worker上使用的Python二進位制可執行檔案(預設是python)
PYSPARK_DRIVER_PYTHON 僅在驅動上使用的Python二進位制可執行檔案(默認同PYSPARK_PYTHON)
SPARKR_DRIVER_R SparkR shell使用的R二進位制可執行檔案(預設是R)
SPARK_LOCAL_IP 本地繫結的IP
SPARK_PUBLIC_DNS Spark程式公佈給其他機器的hostname

另外,還有一些選項需要在Spark standalone cluster scripts裡設定,如:每臺機器上使用的core數量,和最大記憶體佔用量。

spark-env.sh是一個shell指令碼,因此一些引數可以通過程式設計方式來設定 – 例如,你可以獲取本機IP來設定SPARK_LOCAL_IP。

日誌配置

Spark使用log4j 打日誌。你可以在conf目錄下用log4j.properties來配置。複製該目錄下已有的log4j.properties.template並改名為log4j.properties即可。

覆蓋配置目錄

預設Spark配置目錄是”${SPARK_HOME}/conf”,你也可以通過 ${SPARK_CONF_DIR}指定其他目錄。Spark會從這個目錄下讀取配置檔案(spark-defaults.conf,spark-env.sh,log4j.properties等)

繼承Hadoop叢集配置

如果你打算用Spark從HDFS讀取資料,那麼有2個Hadoop配置檔案必須放到Spark的classpath下:

  • hdfs-site.xml,配置HDFS客戶端的預設行為
  • core-site.xml,預設檔案系統名

這些配置檔案的路徑在不同釋出版本中不太一樣(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目錄下找到。一些工具,如Cloudera Manager,可以動態修改配置,而且提供了下載一份拷貝的機制。

要想讓這些配置對Spark可見,請在${SPARK_HOME}/spark-env.sh中設定HADOOP_CONF_DIR變數。