《Spark 官方文件》Spark配置
Spark配置
Spark有以下三種方式修改配置:
- Spark properties (Spark屬性)可以控制絕大多數應用程式引數,而且既可以通過 SparkConf 物件來設定,也可以通過Java系統屬性來設定。
- Environment variables (環境變數)可以指定一些各個機器相關的設定,如IP地址,其設定方法是寫在每臺機器上的conf/spark-env.sh中。
- Logging (日誌)可以通過log4j.properties配置日誌。
Spark屬性
Spark屬性可以控制大多數的應用程式設定,並且每個應用的設定都是分開的。這些屬性可以用SparkConf 物件直接設定。SparkConf為一些常用的屬性定製了專用方法(如,master URL和application name),其他屬性都可以用鍵值對做引數,呼叫set()方法來設定。例如,我們可以初始化一個包含2個本地執行緒的Spark應用,程式碼如下:
注意,local[2]代表2個本地執行緒 – 這是最小的併發方式,可以幫助我們發現一些只有在分散式上下文才能復現的bug。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
注意,本地模式下,我們可以使用n個執行緒(n >= 1)。而且在像Spark Streaming這樣的場景下,我們可能需要多個執行緒來防止類似執行緒餓死這樣的問題。
配置時間段的屬性應該寫明時間單位,如下格式都是可接受的:
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
配置大小的屬性也應該寫明單位,如下格式都是可接受的:
1b (bytes) 1k or 1kb (kibibytes = 1024 bytes) 1m or 1mb (mebibytes = 1024 kibibytes) 1g or 1gb (gibibytes = 1024 mebibytes) 1t or 1tb (tebibytes = 1024 gibibytes) 1p or 1pb (pebibytes = 1024 tebibytes)
動態載入Spark屬性
在某些場景下,你可能需要避免將屬性值寫死在 SparkConf 中。例如,你可能希望在同一個應用上使用不同的master或不同的記憶體總量。Spark允許你簡單地建立一個空的SparkConf物件:
val sc = new SparkContext(new SparkConf())
然後在執行時設定這些屬性:
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Spark shell和工具支援兩種動態載入配置的方法。第一種,通過命令列選項,如:上面提到的–master(設定master URL)。spark-submit可以在啟動Spark應用時,通過–conf標誌接受任何屬性配置,同時有一些特殊配置引數同樣可用(如,–master)。執行./bin/spark-submit –help可以展示這些選項的完整列表。
同時,bin/spark-submit 也支援從conf/spark-defaults.conf 中讀取配置選項,在該檔案中每行是一個鍵值對,並用空格分隔,如下:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
這些通過引數或者屬性配置檔案傳遞的屬性,最終都會在SparkConf 中合併。其優先順序是:首先是SparkConf程式碼中寫的屬性值,其次是spark-submit或spark-shell的標誌引數,最後是spark-defaults.conf檔案中的屬性。
有一些配置項被重新命名過,這種情形下,老的名字仍然是可以接受的,只是優先順序比新名字優先順序低。
檢視Spark屬性
每個SparkContext都有其對應的Spark UI,所以Spark應用程式都能通過Spark UI檢視其屬性。預設你可以在這裡看到:http://<driver>:4040,頁面上的”Environment“ tab頁可以檢視Spark屬性。如果你真的想確認一下屬性設定是否正確的話,這個功能就非常有用了。注意,只有顯式地通過SparkConf物件、在命令列引數、或者spark-defaults.conf設定的引數才會出現在頁面上。其他屬性,你可以認為都是預設值。
可用的屬性
絕大多數屬性都有合理的預設值。這裡是部分常用的選項:
應用屬性
屬性名稱 | 預設值 | 含義 |
---|---|---|
spark.app.name |
(none) | Spark應用的名字。會在SparkUI和日誌中出現。 |
spark.driver.cores |
1 | 在cluster模式下,用幾個core執行驅動器(driver)程序。 |
spark.driver.maxResultSize |
1g | Spark action運算元返回的結果最大多大。至少要1M,可以設為0表示無限制。如果結果超過這一大小,Spark作業(job)會直接中斷退出。但是,設得過高有可能導致驅動器OOM(out-of-memory)(取決於spark.driver.memory設定,以及驅動器JVM的記憶體限制)。設一個合理的值,以避免驅動器OOM。 |
spark.driver.memory |
1g | 驅動器程序可以用的記憶體總量(如:1g,2g)。 注意,在客戶端模式下,這個配置不能在SparkConf中直接設定(因為驅動器JVM都啟動完了呀!)。驅動器客戶端模式下,必須要在命令列裡用 –driver-memory 或者在預設屬性配置檔案裡設定。 |
spark.executor.memory |
1g | 單個執行器(executor)使用的記憶體總量(如,2g,8g) |
spark.extraListeners |
(none) | 逗號分隔的SparkListener子類的類名列表;初始化SparkContext時,這些類的例項會被創建出來,並且註冊到Spark的監聽總線上。如果這些類有一個接受SparkConf作為唯一引數的建構函式,那麼這個建構函式會被優先呼叫;否則,就呼叫無引數的預設建構函式。如果沒有建構函式,SparkContext建立的時候會拋異常。 |
spark.local.dir |
/tmp | Spark的”草稿“目錄,包括map輸出的臨時檔案,或者RDD存在磁碟上的資料。這個目錄最好在本地檔案系統中,這樣讀寫速度快。這個配置可以接受一個逗號分隔的列表,通常用這種方式將檔案IO分散不同的磁碟上去。 注意:Spark-1.0及以後版本中,這個屬性會被叢集管理器所提供的環境變數覆蓋:SPARK_LOCAL_DIRS(獨立部署或Mesos)或者 LOCAL_DIRS(YARN)。 |
spark.logConf |
false | SparkContext啟動時是否把生效的 SparkConf 屬性以INFO日誌列印到日誌裡 |
spark.master |
(none) |
除了這些以外,以下還有很多可用的引數配置,在某些特定情形下,可能會用到:
執行時環境
屬性名 | 預設值 | 含義 |
---|---|---|
spark.driver.extraClassPath |
(none) | 額外的classpath,將插入到到驅動器的classpath開頭。 注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf 在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-class-path)或者在 conf/spark-defaults.conf 配置。 |
spark.driver.extraJavaOptions |
(none) | 驅動器額外的JVM選項。如:GC設定或其他日誌引數。 注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-java-options)或者conf/spark-defaults.conf 配置。 |
spark.driver.extraLibraryPath |
(none) | 啟動驅動器JVM時候指定的依賴庫路徑。 注意:驅動器如果執行客戶端模式下,這個配置不能通過SparkConf在程式裡配置,因為這時候程式已經啟動呀!而是應該用命令列引數(–driver-library-path)或者conf/spark-defaults.conf 配置。 |
spark.driver.userClassPathFirst |
false | (試驗性的:即未來不一定會支援該配置) 驅動器是否首選使用使用者指定的jars,而不是spark自身的。這個特性可以用來處理使用者依賴和spark本身依賴項之間的衝突。目前還是試驗性的,並且只能用在叢集模式下。 |
spark.executor.extraClassPath |
(none) | 新增到執行器(executor)classpath開頭的classpath。主要為了向後相容老的spark版本,不推薦使用。 |
spark.executor.extraJavaOptions |
(none) | 傳給執行器的額外JVM引數。如:GC設定或其他日誌設定等。注意,不能用這個來設定Spark屬性或者堆記憶體大小。Spark屬性應該用SparkConf物件,或者spark-defaults.conf檔案(會在spark-submit指令碼中使用)來配置。執行器堆記憶體大小應該用 spark.executor.memory配置。 |
spark.executor.extraLibraryPath |
(none) | 啟動執行器JVM時使用的額外依賴庫路徑。 |
spark.executor.logs.rolling.maxRetainedFiles |
(none) | Sets the number of latest rolling log files that are going to be retained by the system. Older log files will be deleted. Disabled by default.設定日誌檔案最大保留個數。老日誌檔案將被幹掉。預設禁用的。 |
spark.executor.logs.rolling.maxSize |
(none) | 設定執行器日誌檔案大小上限。預設禁用的。 需要自動刪日誌請參考 spark.executor.logs.rolling.maxRetainedFiles. |
spark.executor.logs.rolling.strategy |
(none) | 執行器日誌滾動策略。預設禁用。 可接受的值有”time”(基於時間滾動) 或者 “size”(基於檔案大小滾動)。 time:將使用 spark.executor.logs.rolling.time.interval設定滾動時間間隔 size:將使用 spark.executor.logs.rolling.size.maxBytes設定檔案大小上限 |
spark.executor.logs.rolling.time.interval |
daily | 設定執行器日誌滾動時間間隔。日誌滾動預設是禁用的。 可用的值有 “daily”, “hourly”, “minutely”,也可設為數字(則單位為秒)。 關於日誌自動清理,請參考 spark.executor.logs.rolling.maxRetainedFiles |
spark.executor.userClassPathFirst |
false | (試驗性的)與 spark.driver.userClassPathFirst類似,只不過這個引數將應用於執行器 |
spark.executorEnv.[EnvironmentVariableName] |
(none) | 向執行器程序增加名為EnvironmentVariableName的環境變數。使用者可以指定多個來設定不同的環境變數。 |
spark.python.profile |
false | 對Python worker啟用效能分析,效能分析結果會在sc.show_profile()中,或者在驅動器退出前展示。也可以用sc.dump_profiles(path)輸出到磁碟上。如果部分分析結果被手動展示過,那麼驅動器退出前就不再自動展示了。預設會使用pyspark.profiler.BasicProfiler,也可以自己傳一個profiler 類引數給SparkContext建構函式。 |
spark.python.profile.dump |
(none) | 這個目錄是用來在驅動器退出前,dump效能分析結果。效能分析結果會按RDD分別dump。同時可以使用ptats.Stats()來裝載。如果制定了這個,那麼分析結果就不再自動展示。 |
spark.python.worker.memory |
512m | 聚合時每個python worker使用的記憶體總量,和JVM的記憶體字串格式相同(如,512m,2g)。如果聚合時使用的記憶體超過這個量,就將資料溢位到磁碟上。 |
spark.python.worker.reuse |
true | 是否複用Python worker。如果是,則每個任務會啟動固定數量的Python worker,並且不需要fork() python程序。如果需要廣播的資料量很大,設為true能大大減少廣播資料量,因為需要廣播的程序數減少了。 |
混洗行為
屬性名 | 預設值 | 含義 |
---|---|---|
spark.reducer.maxSizeInFlight |
48m | map任務輸出同時reduce任務獲取的最大記憶體佔用量。每個輸出需要建立buffer來接收,對於每個reduce任務來說,有一個固定的記憶體開銷上限,所以最好別設太大,除非你記憶體非常大。 |
spark.shuffle.compress |
true | 是否壓縮map任務的輸出檔案。通常來說,壓縮是個好主意。使用的壓縮演算法取決於 spark.io.compression.codec |
spark.shuffle.file.buffer |
32k | 每個混洗輸出流的記憶體buffer大小。這個buffer能減少混洗檔案的建立和磁碟定址。 |
spark.shuffle.io.maxRetries |
3 | (僅對netty)如果IO相關異常發生,重試次數(如果設為非0的話)。重試能是大量資料的混洗操作更加穩定,因為重試可以有效應對長GC暫停或者網路閃斷。 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (僅netty)主機之間的連線是複用的,這樣可以減少大叢集中重複建立連線的次數。然而,有些叢集是機器少,磁碟多,這種叢集可以考慮增加這個引數值,以便充分利用所有磁碟併發效能。 |
spark.shuffle.io.preferDirectBufs |
true | (僅netty)堆外快取可以有效減少垃圾回收和快取複製。對於堆外記憶體緊張的使用者來說,可以考慮禁用這個選項,以迫使netty所有記憶體都分配在堆上。 |
spark.shuffle.io.retryWait |
5s | (僅netty)混洗重試獲取資料的間隔時間。預設最大重試延遲是15秒,設定這個引數後,將變成maxRetries* retryWait。 |
spark.shuffle.manager |
sort | 混洗資料的實現方式。可用的有”sort”和”hash“。基於排序(sort)的混洗記憶體利用率更高,並且從1.2開始已經是預設值了。 |
spark.shuffle.service.enabled |
false | 啟用外部混洗服務。啟用外部混洗服務後,執行器生成的混洗中間檔案就由該服務保留,這樣執行器就可以安全的退出了。如果 spark.dynamicAllocation.enabled啟用了,那麼這個引數也必須啟用,這樣動態分配才能有外部混洗服務可用。 更多請參考:dynamic allocation configuration and setup documentation |
spark.shuffle.service.port |
7337 | 外部混洗服務對應埠 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高階)在基於排序(sort)的混洗管理器中,如果沒有map端聚合的話,就會最多存在這麼多個reduce分割槽。 |
spark.shuffle.spill.compress |
true | 是否在混洗階段壓縮溢位到磁碟的資料。壓縮演算法取決於spark.io.compression.codec |
Spark UI
屬性名 | 預設值 | 含義 |
---|---|---|
spark.eventLog.compress |
false | 是否壓縮事件日誌(當然spark.eventLog.enabled必須開啟) |
spark.eventLog.dir |
file:///tmp/spark-events | Spark events日誌的基礎目錄(當然spark.eventLog.enabled必須開啟)。在這個目錄中,spark會給每個應用建立一個單獨的子目錄,然後把應用的events log打到子目錄裡。使用者可以設定一個統一的位置(比如一個HDFS目錄),這樣history server就可以從這裡讀取歷史檔案。 |
spark.eventLog.enabled |
false | 是否啟用Spark事件日誌。如果Spark應用結束後,仍需要在SparkUI上檢視其狀態,必須啟用這個。 |
spark.ui.killEnabled |
true | 允許從SparkUI上殺掉stage以及對應的作業(job) |
spark.ui.port |
4040 | SparkUI埠,展示應用程式執行狀態。 |
spark.ui.retainedJobs |
1000 | SparkUI和status API最多保留多少個spark作業的資料(當然是在垃圾回收之前) |
spark.ui.retainedStages |
1000 | SparkUI和status API最多保留多少個spark步驟(stage)的資料(當然是在垃圾回收之前) |
spark.worker.ui.retainedExecutors |
1000 | SparkUI和status API最多保留多少個已結束的執行器(executor)的資料(當然是在垃圾回收之前) |
spark.worker.ui.retainedDrivers |
1000 | SparkUI和status API最多保留多少個已結束的驅動器(driver)的資料(當然是在垃圾回收之前) |
spark.sql.ui.retainedExecutions |
1000 | SparkUI和status API最多保留多少個已結束的執行計劃(execution)的資料(當然是在垃圾回收之前) |
spark.streaming.ui.retainedBatches |
1000 | SparkUI和status API最多保留多少個已結束的批量(batch)的資料(當然是在垃圾回收之前) |
壓縮和序列化
屬性名 | 預設值 | 含義 |
---|---|---|
spark.broadcast.compress |
true | 是否在廣播變數前使用壓縮。通常是個好主意。 |
spark.closure.serializer |
org.apache.spark.serializer. JavaSerializer |
閉包所使用的序列化類。目前只支援Java序列化。 |
spark.io.compression.codec |
snappy | 內部資料使用的壓縮演算法,如:RDD分割槽、廣播變數、混洗輸出。Spark提供了3中演算法:lz4,lzf,snappy。你也可以使用全名來指定壓縮演算法:org.apache.spark.io.LZ4CompressionCodec ,org.apache.spark.io.LZFCompressionCodec ,org.apache.spark.io.SnappyCompressionCodec . |
spark.io.compression.lz4.blockSize |
32k | LZ4演算法使用的塊大小。當然你需要先使用LZ4壓縮。減少塊大小可以減少混洗時LZ4算法佔用的記憶體量。 |
spark.io.compression.snappy.blockSize |
32k | Snappy演算法使用的塊大小(先得使用Snappy演算法)。減少塊大小可以減少混洗時Snappy算法佔用的記憶體量。 |
spark.kryo.classesToRegister |
(none) | 如果你使用Kryo序列化,最好指定這個以提高效能(tuning guide)。 本引數接受一個逗號分隔的類名列表,這些類都會註冊為Kryo可序列化型別。 |
spark.kryo.referenceTracking |
true (false when using Spark SQL Thrift Server) | 是否跟蹤同一物件在Kryo序列化的引用。如果你的物件圖中有迴圈護著包含統一物件的多份拷貝,那麼最好啟用這個。如果沒有這種情況,那就禁用以提高效能。 |
spark.kryo.registrationRequired |
false | Kryo序列化時,是否必須事先註冊。如果設為true,那麼Kryo遇到沒有註冊過的型別,就會拋異常。如果設為false(預設)Kryo會序列化未註冊型別的物件,但會有比較明顯的效能影響,所以啟用這個選項,可以強制必須在序列化前,註冊可序列化型別。 |
spark.kryo.registrator |
(none) | 如果你使用Kryo序列化,用這個class來註冊你的自定義型別。如果你需要自定義註冊方式,這個引數很有用。否則,使用 spark.kryo.classesRegister更簡單。要設定這個引數,需要用tuning guide 。 |
spark.kryoserializer.buffer.max |
64m | 最大允許的Kryo序列化buffer。必須必你所需要序列化的物件要大。如果你在Kryo中看到”buffer limit exceeded”這個異常,你就得增加這個值了。 |
spark.kryoserializer.buffer |
64k | Kryo序列化的初始buffer大小。注意,每臺worker上對應每個core會有一個buffer。buffer最大增長到 spark.kryoserializer.buffer.max |
spark.rdd.compress |
false | 是否壓縮序列化後RDD的分割槽(如:StorageLevel.MEMORY_ONLY_SER)。能節省大量空間,但多消耗一些CPU。 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer (org.apache.spark.serializer. KryoSerializer when using Spark SQL Thrift Server) |
|
spark.serializer.objectStreamReset |
100 | 如果使用org.apache.spark.serializer.JavaSerializer做序列化器,序列化器快取這些物件,以避免輸出多餘資料,然而,這個會打斷垃圾回收。通過呼叫reset來flush序列化器,從而使老物件被回收。要禁用這一週期性reset,需要把這個引數設為-1,。預設情況下,序列化器會每過100個物件,被reset一次。 |
記憶體管理
屬性名 | 預設值 | 含義 |
---|---|---|
spark.memory.fraction |
0.75 | 堆記憶體中用於執行、混洗和儲存(快取)的比例。這個值越低,則執行中溢位到磁碟越頻繁,同時快取被逐出記憶體也更頻繁。這個配置的目的,是為了留出使用者自定義資料結構、內部元資料使用的記憶體。推薦使用預設值。請參考this description. |
spark.memory.storageFraction |
0.5 | 不會被逐出記憶體的總量,表示一個相對於 spark.memory.fraction的比例。這個越高,那麼執行混洗等操作用的記憶體就越少,從而溢位磁碟就越頻繁。推薦使用預設值。更詳細請參考 this description. |
spark.memory.offHeap.enabled |
true | 如果true,Spark會嘗試使用堆外記憶體。啟用 後,spark.memory.offHeap.size必須為正數。 |
spark.memory.offHeap.size |
0 | 堆外記憶體分配的大小(絕對值)。這個設定不會影響堆記憶體的使用,所以你的執行器總記憶體必須適應JVM的堆記憶體大小。必須要設為正數。並且前提是 spark.memory.offHeap.enabled=true. |
spark.memory.useLegacyMode |
false | 是否使用老式的記憶體管理模式(1.5以及之前)。老模式在堆記憶體管理上更死板,使用固定劃分的區域做不同功能,潛在的會導致過多的資料溢位到磁碟(如果不小心調整效能)。必須啟用本引數,以下選項才可用:spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction |
spark.shuffle.memoryFraction |
0.2 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 混洗階段用於聚合和協同分組的JVM堆記憶體比例。在任何指定的時間,所有用於混洗的記憶體總和不會超過這個上限,超出的部分會溢位到磁碟上。如果溢位臺頻繁,考慮增加spark.storage.memoryFraction的大小。 |
spark.storage.memoryFraction |
0.6 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 Spark用於快取資料的對記憶體比例。這個值不應該比JVM 老生代(old generation)物件所佔用的記憶體大,預設是60%的堆記憶體,當然你可以增加這個值,同時配置你所用的老生代物件佔用記憶體大小。 |
spark.storage.unrollFraction |
0.2 | (廢棄)必須先啟用spark.memory.useLegacyMode這個才有用。 Spark塊展開的記憶體佔用比例。如果沒有足夠的記憶體來完整展開新的塊,那麼老的塊將被拋棄。 |
執行行為
屬性名 | 預設值 | 含義 |
---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory每個分片大小。太大會減少廣播時候的併發數(更慢了);如果太小,BlockManager可能會給出效能提示。 |
spark.broadcast.factory |
org.apache.spark.broadcast. TorrentBroadcastFactory |
廣播演算法的實現。 |
spark.cleaner.ttl |
(infinite) | Spark記住任意元資料的保留時間(秒)。週期性的清理能保證比這個更老的元資料將被遺忘(刪除)。這對於長期執行的Spark作業非常有用(如,一些7*24執行)。注意,RDD持久化到記憶體中後,過了這麼長時間以後,也會被清理掉(這。。。是不是有點坑!)。 |
spark.executor.cores |
YARN模式下預設1;如果是獨立部署,則是worker節點上所有可用的core。 | 單個執行器可用的core數。僅針對YARN和獨立部署模式。獨立部署時,單個worker節點上會執行多個執行器(executor),只要worker上有足夠的core。否則,每個應用在單個worker上只會啟動一個執行器。 |
spark.default.parallelism |
對於reduceByKey和join這樣的分散式混洗(shuffle)運算元,等於父RDD中最大的分割槽。對於parallelize這樣沒有父RDD的運算元,則取決於叢集管理器:
|
如果使用者沒有在引數裡指定,這個屬性是預設的RDD transformation運算元分割槽數,如:join,reduceByKey,parallelize等。 |
spark.executor.heartbeatInterval |
10s | 執行器心跳間隔(報告心跳給驅動器)。心跳機制使驅動器瞭解哪些執行器還活著,並且可以從心跳資料中獲得執行器的度量資料。 |
spark.files.fetchTimeout |
60s | 獲取檔案的通訊超時,所獲取的檔案是通過在驅動器上呼叫SparkContext.addFile()新增的。 |
spark.files.useFetchCache |
true | 如果設為true(預設),則同一個spark應用的不同執行器之間,會使用一二共享快取來拉取檔案,這樣可以提升同一主機上執行多個執行器時候,任務啟動的效能。如果設為false,這個優化就被禁用,各個執行器將使用自己獨有的快取,他們拉取的檔案也是各自有一份拷貝。如果在NFS檔案系統上使用本地檔案系統,可以禁用掉這個優化(參考SPARK-6313) |
spark.files.overwrite |
false | SparkContext.addFile()新增的檔案已經存在,且內容不匹配的情況下,是否覆蓋。 |
spark.hadoop.cloneConf |
false | 如設為true,對每個任務複製一份Hadoop Configuration物件。啟用這個可以繞過Configuration執行緒安全問題(SPARK-2546 )。預設這個是禁用的,很多job並不會受這個issue的影響。 |
spark.hadoop.validateOutputSpecs |
true | 如設為true,在saveAsHadoopFile及其變體的時候,將會驗證輸出(例如,檢查輸出目錄是否存在)。對於已經驗證過或確認存在輸出目錄的情況,可以禁用這個。我們建議不要禁用,除非你確定需要和之前的spark版本相容。可以簡單的利用Hadoop 檔案系統API手動刪掉已存在的輸出目錄。這個設定會被Spark Streaming StreamingContext生成的job忽略,因為Streaming需要在回覆檢查點的時候,覆蓋已有的輸出目錄。 |
spark.storage.memoryMapThreshold |
2m | spark從磁碟上讀取一個塊後,對映到記憶體塊的最小大小。這阻止了spark對映過小的記憶體塊。通常,記憶體對映塊是有開銷的,應該比接近或小於作業系統的頁大小。 |
spark.externalBlockStore.blockManager |
org.apache.spark.storage.TachyonBlockManager | 用於儲存RDD的外部塊管理器(檔案系統)的實現。 檔案系統URL由spark.externalBlockStore.url決定。 |
spark.externalBlockStore.baseDir |
System.getProperty(“java.io.tmpdir”) | 外部塊儲存存放RDD的目錄。檔案系統URL由spark.externalBlockStore.url決定。也可以是逗號分隔的目錄列表(Tachyon檔案系統) |
spark.externalBlockStore.url |
tachyon://localhost:19998 for Tachyon | 所使用的外部塊儲存檔案系統URL。 |
網路
屬性名 | 預設值 | 含義 |
---|---|---|
spark.akka.frameSize |
128 | “control plane” 通訊中所允許的最大訊息大小(MB)。通常,只應用於map輸出資料的大小資訊,這些資訊會在執行器和驅動器之間傳遞。如果你的job包含幾千個map和reduce任務,你可能需要增大這個設定。 |
spark.akka.heartbeat.interval |
1000s | 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。設成較大的值可以減少網路開銷,而較小的值(1秒左右)可能會對Akka的失敗檢測更有用。如有需要,可以調整這個值和spark.akka.heartbeat.pauses的組合。一種可能需要使用失敗檢測的情形是:用一個敏感的失敗檢測,可以快速識別並逐出不穩定的執行器。然而,在真實的spark叢集中,這通常不是GC暫停或網路延遲造成的。除此之外,啟用這個還會導致過多的心跳資料交換,從而造成網路洪峰。 |
spark.akka.heartbeat.pauses |
6000s | 設這麼大的值,是為了禁用Akka傳輸失敗檢測器。也可以重新啟用,如果你想用這個特性(但不建議)。這個是可接受的Akka心跳暫停時間。這個可以用來控制對GC暫停敏感程度。如有需要,可以調整這個值和spark.akka.heartbeat.interval的組合。 |
spark.akka.threads |
4 | 用於通訊的actor執行緒數。如果驅動器機器上有很多CPU core,你可以適當增大這個值。 |
spark.akka.timeout |
100s | Spark節點之間通訊超時。 |
spark.blockManager.port |
(random) | 塊管理器(block manager)監聽埠。在驅動器和執行器上都有。 |
spark.broadcast.port |
(random) | 驅動器HTTP廣播server監聽埠。這和torrent廣播沒有關係。 |
spark.driver.host |
(local hostname) | 驅動器主機名。用於和執行器以及獨立部署時叢集master通訊。 |
spark.driver.port |
(random) | 驅動器埠。用於和執行器以及獨立部署時叢集master通訊。 |
spark.executor.port |
(random) | 執行器埠。用於和驅動器通訊。 |
spark.fileserver.port |
(random) | 驅動器HTTP檔案server監聽埠。 |
spark.network.timeout |
120s | 所有網路互動的預設超時。這個配置是以下屬性的預設值:spark.core.connection.ack.wait.timeout ,spark.akka.timeout ,spark.storage.blockManagerSlaveTimeoutMs ,spark.shuffle.io.connectionTimeout ,spark.rpc.askTimeout orspark.rpc.lookupTimeout |
spark.port.maxRetries |
16 | 繫結一個埠的最大重試次數。如果指定了一個埠(非0),每個後續重試會在之前嘗試的埠基礎上加1,然後再重試繫結。本質上,這確定了一個繫結埠的範圍,就是 [start port, start port + maxRetries] |
spark.replClassServer.port |
(random) | 驅動器HTTP class server的監聽埠。只和spark shell相關。 |
spark.rpc.numRetries |
3 | RPC任務最大重試次數。RPC任務最多重試這麼多次。 |
spark.rpc.retry.wait |
3s | RPC請求操作重試前等待時間。 |
spark.rpc.askTimeout |
120s | RPC請求操作超時等待時間。 |
spark.rpc.lookupTimeout |
120s | RPC遠端端點查詢超時。 |
排程
屬性名 | 預設值 | 含義 |
---|---|---|
spark.cores.max |
(not set) | 如果執行在獨立部署叢集模式(standalone deploy cluster)或者Mesos叢集粗粒度共享模式(Mesos cluster in “coarse-grained” sharing mode),這個值決定了spark應用可以使用的最大CPU總數(應用在整個叢集中可用CPU總數,而不是單個機器)。如果不設定,那麼獨立部署時預設為spark.deploy.defaultCores,Mesos叢集則預設無限制(即所有可用的CPU)。 |
spark.locality.wait |
3s | 為了資料本地性最長等待時間(spark會根據資料所在位置,儘量讓任務也啟動於相同的節點,然而可能因為該節點上資源不足等原因,無法滿足這個任務分配,spark最多等待這麼多時間,然後放棄資料本地性)。資料本地性有多個級別,每一級別都是等待這麼多時間(同一程序、同一節點、同一機架、任意)。你也可以為每個級別定義不同的等待時間,需要設定spark.locality.wait.node等。如果你發現任務資料本地性不佳,可以增加這個值,但通常預設值是ok的。 |
spark.locality.wait.node |
spark.locality.wait | 單獨定義同一節點資料本地性任務等待時間。你可以設為0,表示忽略節點本地性,直接跳到下一級別,即機架本地性(如果你的叢集有機架資訊)。 |
spark.locality.wait.process |
spark.locality.wait | 單獨定義同一程序資料本地性任務等待時間。這個引數影響試圖訪問特定執行器上的快取資料的任務。 |
spark.locality.wait.rack |
spark.locality.wait | 單獨定義同一機架資料本地性等待時間。 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | 排程開始前,向叢集管理器註冊使用資源的最大等待時間。 |
spark.scheduler.minRegisteredResourcesRatio |
0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode |
排程啟動前,需要註冊得到資源的最小比例(註冊到的資源數 / 需要資源總數)(YARN模式下,資源是執行器;獨立部署和Mesos粗粒度模式下時資源是CPU core【spark.cores.max是期望得到的資源總數】)。可以設為0.0~1.0的一個浮點數。不管job是否得到了最小資源比例,最大等待時間都是由spark.scheduler.maxRegisteredResourcesWaitingTime控制的。 |
spark.scheduler.mode |
FIFO | 提交到同一個SparkContext上job的排程模式(scheduling mode)。另一個可接受的值是FAIR,而FIFO只是簡單的把job按先來後到排隊。對於多使用者服務很有用。 |
spark.scheduler.revive.interval |
1s | 排程器復活worker的間隔時間。 |
spark.speculation |
false | 如果設為true,將會啟動推測執行任務。這意味著,如果stage中有任務執行較慢,他們會被重新排程到別的節點上執行。 |
spark.speculation.interval |
100ms | Spark檢查慢任務的時間間隔。 |
spark.speculation.multiplier |
1.5 | 比任務平均執行時間慢多少倍的任務會被認為是慢任務。 |
spark.speculation.quantile |
0.75 | 對於一個stage來說,完成多少百分比才開始檢查慢任務,並啟動推測執行任務。 |
spark.task.cpus |
1 | 每個任務分配的CPU core。 |
spark.task.maxFailures |
4 | 單個任務最大失敗次數。應該>=1。最大重試次數 = spark.task.maxFailures – 1 |
動態分配
屬性名 | 預設值 | 含義 |
---|---|---|
spark.dynamicAllocation.enabled |
false | 是否啟用動態資源分配特性,啟用後,執行器的個數會根據工作負載動態的調整(增加或減少)。注意,目前在YARN模式下不用。更詳細資訊,請參考: here該特性依賴於 spark.shuffle.service.enabled 的啟用。同時還和以下配置相關:spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors以及 spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation |
60s | 動態分配特性啟用後,空閒時間超過該配置時間的執行器都會被移除。更詳細請參考這裡:description |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動態分配特性啟用後,包含快取資料的執行器如果空閒時間超過該配置設定的時間,則被移除。更詳細請參考:description |
spark.dynamicAllocation.initialExecutors |
spark .dynamicAllocation .minExecutors |
動態分配開啟後,執行器的初始個數 |
spark.dynamicAllocation.maxExecutors |
infinity | 動態分配開啟後,執行器個數的上限 |
spark.dynamicAllocation.minExecutors |
0 | 動態分配開啟後,執行器個數的下限 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 動態分配啟用後,如果有任務積壓的持續時間長於該配置設定的時間,則申請新的執行器。更詳細請參考:description |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
和spark.dynamicAllocation.schedulerBacklogTimeout類似,只不過該配置對應於隨後持續的執行器申請。更詳細請參考: description |
安全
屬性名 | 預設值 | 含義 |
---|---|---|
spark.acls.enable |
false | 是否啟用Spark acls(訪問控制列表)。如果啟用,那麼將會檢查使用者是否有許可權檢視或修改某個作業(job)。注意,檢查的前提是需要知道使用者是誰,所以如果使用者是null,則不會做任何檢查。你可以在Spark UI上設定過濾器(Filters)來做使用者認證,並設定使用者名稱。 |
spark.admin.acls |
Empty | 逗號分隔的使用者列表,在該列表中的使用者/管理員將能夠訪問和修改所有的Spark作業(job)。如果你的叢集是共享的,並且有叢集管理員,還有需要除錯的開發人員,那麼這個配置會很有用。如果想讓所有人都有管理員許可權,只需把該配置設定為”*” |
spark.authenticate |
false | 設定Spark是否認證叢集內部連線。如果不是在YARN上執行,請參考 spark.authenticate.secret |
spark.authenticate.secret |
None | 設定Spark用於內部元件認證的祕鑰。如果不是在YARN上執行,且啟用了 spark.authenticate,那麼該配置必須設定 |
spark.authenticate.enableSaslEncryption |
false | 是否對Spark內部元件認證使用加密通訊。該配置目前只有 block transfer service 使用。 |
spark.network.sasl.serverAlwaysEncrypt |
false | 是否對支援SASL認證的service禁用非加密通訊。該配置目前只有 external shuffle service 支援。 |
spark.core.connection.ack.wait.timeout |
60s | 網路連線等待應答訊號的超時時間。為了避免由於GC等導致的意外超時,你可以設定一個較大的值。 |
spark.core.connection.auth.wait.timeout |
30s | 網路連線等待認證的超時時間。 |
spark.modify.acls |
Empty | 逗號分隔的使用者列表,在改列表中的使用者可以修改Spark作業。預設情況下,只有啟動該Spark作業的使用者可以修改之(比如殺死該作業)。如果想要任何使用者都可以修改作業,請將該配置設定為”*” |
spark.ui.filters |
None | 逗號分隔的過濾器class列表,這些過濾器將用於Spark web UI。這裡的過濾器應該是一個標準的 javax servlet Filter 。每個過濾器的引數可以通過java系統屬性來設定,如下:spark.<class name of filer>.params=’param1=value1,param2=value2’例如: -Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’ |
spark.ui.view.acls |
Empty | 逗號分隔的使用者列表,在該列表中的使用者可以檢視Spark web UI。預設,只有啟動該Spark作業的使用者可以檢視之。如果需要讓所有使用者都能檢視,只需將該配置設為”*” |
加密
屬性名 | 預設值 | 含義 |
---|---|---|
spark.ssl.enabled |
false | 是否啟用SSL連線(在所有所支援的協議上)。所有SSL相關配置(spark.ssl.xxx,其中xxx是一個特定的配置屬性),都是全域性的。如果需要在某些協議上覆蓋全域性設定,那麼需要在該協議名稱空間上進行單獨配置。使用 spark.ssl.YYY.XXX 來為協議YYY覆蓋全域性配置XXX。目前YYY的可選值有 akka(用於基於AKKA框架的網路連線) 和 fs(用於應廣播和檔案伺服器) |
spark.ssl.enabledAlgorithms |
Empty | 逗號分隔的加密演算法列表。這些加密演算法必須是JVM所支援的。這裡有個可用加密演算法參考列表: this |
spark.ssl.keyPassword |
None | 在key-store中私匙對應的密碼。 |
spark.ssl.keyStore |
None | key-store檔案路徑。可以是絕對路徑,或者以本元件啟動的工作目錄為基礎的相對路徑。 |
spark.ssl.keyStorePassword |
None | key-store的密碼。 |
spark.ssl.protocol |
None | 協議名稱。該協議必須是JVM所支援的。這裡有JVM支援的協議參考列表:this |
spark.ssl.trustStore |
None | trust-store檔案路徑。可以是絕對路徑,或者以本元件啟動的工作目錄為基礎的相對路徑。 |
spark.ssl.trustStorePassword |
None | trust-store的密碼 |
Spark Streaming [流式]
屬性名 | 預設值 | 含義 |
---|---|---|
spark.streaming.backpressure.enabled |
false | 是否啟用Spark Streaming 的內部反壓機制(spark 1.5以上支援)。啟用後,Spark Streaming會根據當前批次的排程延遲和處理時長來控制接收速率,這樣一來,系統的接收速度會和處理速度相匹配。該特性會在內部動態地設定接收速率。該速率的上限將由 spark.streaming.receiver.maxRate 和 spark.streaming.kafka.maxRatePerPartition 決定(如果它們設定了的話)。 |
spark.streaming.blockInterval |
200ms | 在將資料儲存到Spark之前,Spark Streaming接收器組裝資料塊的時間間隔。建議不少於50ms。關於Spark Streaming程式設計指南細節,請參考 performance tuning 這一節。 |
spark.streaming.receiver.maxRate |
not set | 接收速度的最大速率(每秒記錄條數)。實際上,每個流每秒將消費這麼多條記錄。設定為0或者負數表示不限制速率。更多細節請參考: deployment guide |
spark.streaming.receiver.writeAheadLog.enable |
false | 是否啟用接收器預寫日誌。所有的輸入資料都會儲存到預寫日誌中,這樣在驅動器失敗後,可以基於預寫日誌來恢復資料。更詳細請參考:deployment guide |
spark.streaming.unpersist |
true | 是否強制Spark Streaming 自動從記憶體中清理掉所生成並持久化的RDD。同時,Spark Streaming收到的原始資料也將會被自動清理掉。如果設定為false,那麼原始資料以及持久化的RDD將不會被自動清理,以便外部程式可以訪問這些資料。當然,這將導致Spark消耗更多的記憶體。 |
spark.streaming.stopGracefullyOnShutdown |
false | 如果設為true,Spark將會在JVM關閉時,優雅地關停StreamingContext,而不是立即關閉之。 |
spark.streaming.kafka.maxRatePerPartition |
not set | 在使用Kafka direct stream API時,從每個Kafka資料分割槽讀取資料的最大速率(每秒記錄條數)。更詳細請參考:Kafka Integration guide |
spark.streaming.kafka.maxRetries |
1 | 驅動器連續重試的最大次數,這個配置是為了讓驅動器找出每個Kafka分割槽上的最大offset(預設值為1,意味著驅動器將最多嘗試2次)。只對新的Kafka direct stream API有效。 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI 以及 status API 中保留的最大批次個數。 |
SparkR
屬性名 | 預設值 | 含義 |
---|---|---|
spark.r.numRBackendThreads |
2 | SparkR RBackEnd處理RPC呼叫的後臺執行緒數 |
spark.r.command |
Rscript | 叢集模式下,驅動器和worker上執行的R指令碼可執行檔案 |
spark.r.driver.command |
spark.r.command | client模式的驅動器執行的R指令碼。叢集模式下會忽略 |
叢集管理器
每個叢集管理器都有一些額外的配置選項。詳細請參考這裡:
YARN
Mesos
環境變數
有些Spark設定需要通過環境變數來設定,這些環境變數可以在${SPARK_HOME}/conf/spark-env.sh指令碼(Windows下是conf/spark-env.cmd)中設定。如果是獨立部署或者Mesos模式,這個檔案可以指定機器相關資訊(如hostname)。執行本地Spark應用或者submit指令碼時,也會引用這個檔案。
注意,conf/spark-env.sh預設是不存在的。你需要複製conf/spark-env.sh.template這個模板來建立,還有注意給這個檔案附上可執行許可權。
以下變數可以在spark-env.sh中設定:
環境變數 | 含義 |
---|---|
JAVA_HOME |
Java安裝目錄(如果沒有在PATH變數中指定) |
PYSPARK_PYTHON |
驅動器和worker上使用的Python二進位制可執行檔案(預設是python) |
PYSPARK_DRIVER_PYTHON |
僅在驅動上使用的Python二進位制可執行檔案(默認同PYSPARK_PYTHON) |
SPARKR_DRIVER_R |
SparkR shell使用的R二進位制可執行檔案(預設是R) |
SPARK_LOCAL_IP |
本地繫結的IP |
SPARK_PUBLIC_DNS |
Spark程式公佈給其他機器的hostname |
另外,還有一些選項需要在Spark standalone cluster scripts裡設定,如:每臺機器上使用的core數量,和最大記憶體佔用量。
spark-env.sh是一個shell指令碼,因此一些引數可以通過程式設計方式來設定 – 例如,你可以獲取本機IP來設定SPARK_LOCAL_IP。
日誌配置
Spark使用log4j 打日誌。你可以在conf目錄下用log4j.properties來配置。複製該目錄下已有的log4j.properties.template並改名為log4j.properties即可。
覆蓋配置目錄
預設Spark配置目錄是”${SPARK_HOME}/conf”,你也可以通過 ${SPARK_CONF_DIR}指定其他目錄。Spark會從這個目錄下讀取配置檔案(spark-defaults.conf,spark-env.sh,log4j.properties等)
繼承Hadoop叢集配置
如果你打算用Spark從HDFS讀取資料,那麼有2個Hadoop配置檔案必須放到Spark的classpath下:
- hdfs-site.xml,配置HDFS客戶端的預設行為
- core-site.xml,預設檔案系統名
這些配置檔案的路徑在不同釋出版本中不太一樣(如CDH和HDP版本),但通常都能在 ${HADOOP_HOME}/etc/hadoop/conf目錄下找到。一些工具,如Cloudera Manager,可以動態修改配置,而且提供了下載一份拷貝的機制。
要想讓這些配置對Spark可見,請在${SPARK_HOME}/spark-env.sh中設定HADOOP_CONF_DIR變數。