spark sql 內建配置(V2.2)
最近整理了一下spark SQL內建配。加粗配置項是對sparkSQL 調優效能影響比較大的項,小夥伴們按需酌情配置。後續會挑出一些通用調優配置,共大家參考。有不正確的地方,歡迎大家在留言區留言討論。
配置項 | 預設值 | 概述 |
spark.sql.optimizer.maxIterations | 100 | sql優化器最大迭代次數 |
spark.sql.optimizer.inSetConversionThreshold | 10 | 插入轉換的集合大小閾值 |
spark.sql.inMemoryColumnarStorage.compressed | TRUE | 當設定為true時,SCAPK SQL將根據資料的統計自動為每個列選擇壓縮編解碼器 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制用於列快取的批處理的大小。較大的批處理大小可以提高記憶體利用率和壓縮率,但快取資料時會出現OOM風險 |
spark.sql.inMemoryColumnarStorage.partitionPruning | TRUE | 啟用記憶體中的列表分割槽剪枝 |
spark.sql.join.preferSortMergeJoin | TRUE | When true, 使用sort merge join 代替 shuffle hash join |
spark.sql.sort.enableRadixSort |
TRUE | 使用基數排序,基數排序效能非常快,但是會額外使用over heap.當排序比較小的Row時,overheap 需要提高50% |
spark.sql.autoBroadcastJoinThreshold | 10L * 1024 * 1024 | 當執行join時,被廣播到worker節點上表最大位元組。當被設定為-1,則禁用廣播。當前僅僅支援 Hive Metastore tables,表大小的統計直接基於hive表的原始檔大小 |
spark.sql.limit.scaleUpFactor | 4 | 在執行查詢時,兩次嘗試之間讀取partation數目的增量。較高的值會導致讀取過多分割槽,較少的值會導致執行時間過長,因為浙江執行更多的作業 |
spark.sql.statistics.fallBackToHdfs | FALSE | 當不能從table metadata中獲取表的統計資訊,返回到hdfs。這否有用取決與表是否足夠小到能夠使用auto broadcast joins |
spark.sql.defaultSizeInBytes | Long.MaxValue | 在查詢計劃中表預設大小,預設被設定成Long.MaxValue 大於spark.sql.autoBroadcastJoinThreshold的值,也就意味著預設情況下不會廣播一個表,除非他足夠小 |
spark.sql.shuffle.partitions | 200 | 當為join/aggregation shuffle資料時,預設partition的數量 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 64 * 1024 * 1024byte | The target post-shuffle input size in bytes of a task. |
spark.sql.adaptive.enabled | FALSE | 是否開啟adaptive query execution(自適應查詢執行) |
spark.sql.adaptive.minNumPostShufflePartitions | -1 | 測試用 |
spark.sql.subexpressionElimination.enabled | TRUE | When true, common subexpressions will be eliminated 當為真時,將刪除公共子表示式 |
spark.sql.caseSensitive | FALSE | 查詢分析器是否區分大小寫,預設情況下不區分。強烈建議不區分大小寫 |
spark.sql.constraintPropagation.enabled | 是否開啟優化,在查詢優化器期間推斷和傳播查詢計劃中的資料約束。對於某種型別的查詢計劃(例如有大量謂語和別名的查詢),約束傳播是昂貴的,會對整個執行時間產生負面影響。 | |
spark.sql.parser.escapedStringLiterals | FALSE | 2.0之前預設值為true,知否預設是否。正常文字能否包含在正則表示式中。 |
spark.sql.parquet.mergeSchema | FALSE | 若為true,在讀取parquet資料來源時,schema從所有檔案中合併出來。否則如果沒有可用的摘要檔案,則從概要檔案或隨機檔案中選擇模式 |
spark.sql.parquet.respectSummaryFiles | FALSE | 若為ture,假設parquet的所有部分檔案和概要檔案一致,在合併模式時會忽略他們。否則將會合並所有的部分檔案 |
spark.sql.parquet.binaryAsString | FALSE | 是否向下相容其他parquet生產系統(eg impala or older version spark sql ),不區分位元組資料和string資料寫到parquet schema,這個配置促使spark sql將二進位制資料作為string達到相容 |
spark.sql.parquet.int96AsTimestamp | TRUE | 是否使用Int96作為timestamp的儲存格式,可以避免精度損失丟失納秒部分,為其他parquet系統提供相容(impala) |
spark.sql.parquet.int64AsTimestampMillis | FALSE | 當為true,timestamp值將以Int64作為mlibs的儲存擴充套件型別,這種模式微秒將被丟棄 |
spark.sql.parquet.cacheMetadata | TRUE | 是否快取parquet的schema資料元,可以提升靜態資料的查詢效能 |
spark.sql.parquet.compression.codec | snappy | 支援型別:uncompressed", "snappy", "gzip", "lzo"。 指定parquet寫檔案的壓縮編碼方式 |
spark.sql.parquet.filterPushdown | TRUE | 是否開啟parquet過濾條件下推 |
spark.sql.parquet.writeLegacyFormat | FALSE | spark sql在拼接schema時是否遵循parquet的schema的規範 |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | parquet輸出提交器類,同城必須是org.apache.hadoop.mapreduce.OutputCommitter的子類,如果不是將不會建立資料來源摘要,即使配置開啟了parquet.enable.summary-metadata |
spark.sql.parquet.enableVectorizedReader | TRUE | 開啟parquet向量解碼 |
spark.sql.orc.filterPushdown | FALSE | 是否開啟條件下推到orc檔案寫 |
spark.sql.hive.verifyPartitionPath | FALSE | 當為true時,在讀取HDFS中儲存的資料時,檢查表根目錄下的所有分割槽路徑 |
spark.sql.hive.metastorePartitionPruning | TRUE | 當為true,spark sql的謂語將被下推到hive metastore中,更早的消除不匹配的分割槽,會影響到違背轉換成檔案源關係的hive表 |
spark.sql.hive.manageFilesourcePartitions | TRUE | 是否使用hive metastore管理spark sql的 dataSource表分割槽,若為true,dataSource表會在執行計劃期間使用分割槽剪枝 |
spark.sql.hive.filesourcePartitionFileCacheSize | 250 * 1024 * 1024 | 當非0時,開啟將分割槽檔案資料元快取到記憶體中,所有表共享一個快取,當開啟 hive filesource partition management(spark.sql.hive.manageFilesourcePartitions)時才會生效 |
spark.sql.hive.caseSensitiveInferenceMode | INFER_AND_SAVE | 設定無法從hive表屬性讀取分割槽大小寫模式時所採取的操作,雖然Spice SQL本身不區分大小寫,但hive相容的檔案格式如parquet。Spark sql必須使用一個保持情況的模式,當查詢由包含區分大小寫欄位名或查詢的檔案支援的任何表可能無法返回準確的結果時。有效選項包括INFER_AND_SAVE(預設模式——從基礎資料檔案推斷出區分大小寫的模式,並將其寫入表屬性),INFER_ONLY(推斷schema但不嘗試將其寫入表屬性)和NEVER_INFER(回退到使用區分大小寫間接轉移模式代替推斷) |
spark.sql.optimizer.metadataOnly | TRUE | 當為true時,啟用僅使用表的元資料的元資料查詢優化來生成分割槽列,而不是表掃描。當掃描的所有列都是分割槽列,並且查詢具有滿足不同語義的聚合運算子時,它適用。 |
spark.sql.columnNameOfCorruptRecord | _corrupt_record | 當json/csv資料內部列解析失敗時,失敗列的名稱 |
spark.sql.broadcastTimeout" | 5*60 | 在broadCast join時 ,廣播等待的超時時間 |
spark.sql.thriftserver.scheduler.pool | 為JDBC客戶端會話設定公平排程程式池 | |
spark.sql.thriftServer.incrementalCollect | FALSE | 當TRUE時,啟用增量集合以在thrift server中執行 |
spark.sql.thriftserver.ui.retainedStatements | 200 | JDBC/ODBC Web使用者介面歷史記錄中SQL語句的數量 |
spark.sql.thriftserver.ui.retainedSessions | 200 | JDBC/ODBC Web UI歷史中儲存的SQL客戶端會話數 |
spark.sql.sources.default | parquet | 輸入輸出預設資料元 |
spark.sql.hive.convertCTAS | FALSE | 如果時true,將使用spark.sql.sources.default.設定資料來源,不指定任何儲存屬性到hive ctas語句 |
spark.sql.hive.gatherFastStats | TRUE | 在修復表分割槽時,將快速收集STATS(檔案數量和所有檔案的總大小),以避免HIVE轉移子中的順序列表。 |
spark.sql.sources.partitionColumnTypeInference.enabled | TRUE | 是否自動推斷分割槽列的資料型別 |
spark.sql.sources.bucketing.enabled | TRUE | 當false時,分桶表當作普通表處理 |
spark.sql.crossJoin.enabled | FALSE | 當false時,如果查詢中語法笛卡兒積 卻語法中沒有顯示join,將會丟擲異常 |
spark.sql.orderByOrdinal | TRUE | 當為true時,排序欄位放置到seleect List,否則被忽略 |
spark.sql.groupByOrdinal | TRUE | 當為true時,按組子句的序號被視為選擇列表中的位置。當為false時,序數被忽略。 |
spark.sql.groupByAliases | TRUE | group by後的別名是否能夠被用到 select list中,若為否將丟擲分析異常 |
spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 允許在driver端列出檔案的最大路徑數。如果在分割槽發現期間檢測到的路徑的數量超過該值,則嘗試用另一個SCAPLE分散式作業來列出檔案。這適用於parquet、ORC、CSV、JSON和LIbSVM資料來源。 |
spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 遞迴地列出路徑集合的並行數,設定阻止檔案列表生成太多工的序號 |
spark.sql.selfJoinAutoResolveAmbiguity | TRUE | 自動解決子連結中的連線條件歧義,修復bug SPARK-6231 |
spark.sql.retainGroupColumns | TRUE | 是否保留分組列 |
spark.sql.pivotMaxValues | 10000 | |
spark.sql.runSQLOnFiles | TRUE | 當為true,在sql查詢時,能夠使用dataSource.path作為表(eg:"select a,b from hdfs://xx/xx/*") |
spark.sql.codegen.wholeStage | TRUE | 當為true,多個運算元的整個stage將被便宜到一個java方法中 |
spark.sql.codegen.maxFields | 100 | 在啟用整個stage codegen之前支援的最大欄位(包括巢狀欄位) |
spark.sql.codegen.fallback | TRUE | 當為true,在整個stage的codegen,對於編譯generated code 失敗的query 部分,將會暫時關閉 |
spark.sql.codegen.maxCaseBranches | 20 | 支援最大的codegen |
spark.sql.files.maxPartitionBytes | 128 * 1024 * 1024 | 在讀取檔案時,一個分割槽最大被讀取的數量,預設值=parquet.block.size |
spark.sql.files.openCostInBytes | 4 * 1024 * 1024 | 為了測定開啟一個檔案的耗時,通過同時掃描配置的位元組數來測定,最好是過度估計,那麼小檔案的分割槽將比具有較大檔案的分割槽更快(首先排程 |
spark.sql.files.ignoreCorruptFiles | FALSE | 是否自動跳過不正確的檔案 |
spark.sql.files.maxRecordsPerFile | 0 | 寫入單個檔案的最大條數,如果時0或者負數,則無限制 |
spark.sql.exchange.reuse | TRUE | planer是否嘗試找出重複的 exchanges並複用 |
spark.sql.streaming.stateStore.minDeltasForSnapshot | 10 | 在合併成快照之前需要生成的狀態儲存增量檔案的最小數目 |
spark.sql.streaming.checkpointLocation | 檢查點資料流的查詢的預設儲存位置 | |
spark.sql.streaming.minBatchesToRetain | 100 | 流式計算最小批次長度 |
spark.sql.streaming.unsupportedOperationCheck | TRUE | streaming query的logical plan 檢查不支援的操作 |
spark.sql.variable.substitute | TRUE | |
spark.sql.codegen.aggregate.map.twolevel.enable | 啟用兩級聚合雜湊對映。當啟用時,記錄將首先“插入/查詢第一級、小、快的對映,然後在第一級滿或無法找到鍵時回落到第二級、更大、較慢的對映。當禁用時,記錄直接進入第二級。預設為真 | |
spark.sql.view.maxNestedViewDepth | 100 | 巢狀檢視中檢視引用的最大深度。巢狀檢視可以引用其他巢狀檢視,依賴關係被組織在有向無環圖(DAG)中。然而,DAG深度可能變得太大,導致意外的行為。此配置限制了這一點:當分析期間檢視深度超過該值時,我們終止解析度以避免潛在錯誤。 |
spark.sql.objectHashAggregate.sortBased.fallbackThreshold | 128 | 在ObjectHashAggregateExec的情況下,當記憶體中雜湊對映的大小增長過大時,我們將回落到基於排序的聚合。此選項為雜湊對映的大小設定行計數閾值。 |
spark.sql.execution.useObjectHashAggregateExec | TRUE | 是否使用 ObjectHashAggregateExec |
spark.sql.streaming.fileSink.log.deletion | TRUE | 是否刪除檔案流接收器中的過期日誌檔案 |
spark.sql.streaming.fileSink.log.compactInterval | 10 | 日誌檔案合併閾值,然後將所有以前的檔案壓縮到下一個日誌檔案中 |
spark.sql.streaming.fileSink.log.cleanupDelay | 10min | 保證一個日誌檔案被所有使用者可見的時長 |
spark.sql.streaming.fileSource.log.deletion | TRUE | 是否刪除檔案流源中過期的日誌檔案 |
spark.sql.streaming.fileSource.log.compactInterval | 10 | 日誌檔案合併閾值,然後將所有以前的檔案壓縮到下一個日誌檔案中 |
spark.sql.streaming.fileSource.log.cleanupDelay | 10min | 保證一個日誌檔案被所有使用者可見的時長 |
spark.sql.streaming.schemaInference | FALSE | 基於檔案的流,是否推斷它的模式 |
spark.sql.streaming.pollingDelay | 10L(MILLISECONDS) | 在沒有資料可用時延遲查詢新資料多長時間 |
spark.sql.streaming.noDataProgressEventInterval | 10000L(MILLISECONDS) | 在沒有資料的情況下,在兩個進度事件之間等待時間 |
spark.sql.streaming.metricsEnabled | FALSE | 是否為活動流查詢報告DoopWalth/CODAHALE度量 |
spark.sql.streaming.numRecentProgressUpdates | 100 | streaming query 保留的進度更新數量 |
spark.sql.statistics.ndv.maxError | 0.05 | 生成列級統計量時超對數G+++演算法允許的最大估計誤差 |
spark.sql.cbo.enabled | FALSE | 在設定true時啟用CBO來估計計劃統計資訊 |
spark.sql.cbo.joinReorder.enabled | FALSE | Enables join reorder in CBO. |
spark.sql.cbo.joinReorder.dp.threshold | 12 | The maximum number of joined nodes allowed in the dynamic programming algorithm |
spark.sql.cbo.joinReorder.card.weight | 0.07 | The weight of cardinality (number of rows) for plan cost comparison in join reorder: rows * weight + size * (1 - weight) |
spark.sql.cbo.joinReorder.dp.star.filter | FALSE | Applies star-join filter heuristics to cost based join enumeration |
spark.sql.cbo.starSchemaDetection | FALSE | When true, it enables join reordering based on star schema detection |
spark.sql.cbo.starJoinFTRatio | 0.9 | Specifies the upper limit of the ratio between the largest fact tables for a star join to be considered |
spark.sql.session.timeZone | TimeZone.getDefault.getID | 時間時區 |
spark.sql.windowExec.buffer.in.memory.threshold | 4096 | 視窗操作符保證儲存在記憶體中的行數的閾值 |
spark.sql.windowExec.buffer.spill.threshold | spark.sql.windowExec.buffer.in.memory.threshold | 視窗操作符溢位的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | Int.MaxValue | 由sortMergeJoin運算子保證儲存在記憶體中的行數的閾值 |
spark.sql.sortMergeJoinExec.buffer.spill.threshold | spark.sql.sortMergeJoinExec.buffer.in.memory.threshold | 由排序合併連線運算子溢位的行數的閾值 |
spark.sql.cartesianProductExec.buffer.in.memory.threshold | 4096 | 笛卡爾乘積運算元保證儲存在記憶體中的行數的閾值 |
spark.sql.cartesianProductExec.buffer.spill.threshold | spark.sql.cartesianProductExec.buffer.in.memory.threshold | 笛卡爾乘積運算元溢位的行數閾值 |
spark.sql.redaction.options.regex | "(?i)url".r |
即便join的hive表沒有10M,也沒有觸發 mapjoin[解決方案]
spark在join的時候,用來判斷一個表的大小是否達到了10M這個限制,是不會去計算這個表在hdfs上的具體的檔案大小的,而是使用hive metadata中的資訊,具體如下圖:
explain出來spark的執行計劃如下:
|
當有些hive沒有totalSize這個資訊的時候,spark就會用sortMergeJoin來做join了,可以使用下面的命令重新生成metadata資訊:
|