1. 程式人生 > 實用技巧 >大資料元件效能調優文件整理

大資料元件效能調優文件整理

12.1 配置原則

如何發揮叢集最佳效能

原則1:CPU核數分配原則 資料節點:建議預留2~4個核給OS和其他程序(資料庫,HBase等)外,其他的核分配給YARN。 控制節點:由於運行的程序較多,建議預留6~8個核。 原則2:記憶體分配 除了分配給OS、其他服務的記憶體外,剩餘的資源應盡量分配給YARN。 原則3:虛擬CPU個數分配 節點上YARN可使用的虛擬CPU個數建議配置為邏輯核數的1.5~2倍之間。如果上層計算應用對CPU的計算能力要求不高,可以配置為2倍的邏輯CPU。 原則4:提高磁碟IO吞吐率 儘可能掛載較多的盤,以提高磁碟IO吞吐率。

影響效能的因素

因素1:檔案服務器磁碟I/O
一般磁碟順序讀寫的速度為百兆級別,如第二代SATA盤順序讀的理論速度為300Mbps,只從一個盤里讀,若想達到1Gbps每秒的匯入速度是不可能的。並且若從一個磁碟讀,單純依靠增加map數來提高匯入速率也不一定可以。因為隨著map數變多,對於一個磁碟里的檔案讀,相當由順序讀變成了隨機讀,map數越多,磁碟讀取檔案的隨機性越強,讀取效能反而越差。如隨機讀最差可變成800Kbps。 因此需要想辦法增大檔案服務器的磁碟IO讀效率,可以使用專業的檔案服務器,如NAS系統,或者使用更簡單的方法,把多個磁碟進行Raid0或者Raid5。 因素2:檔案服務器網路頻寬 單個檔案服務器的網路頻寬越大越好,建議在10000Mb/s以上。 因素3:叢集節點硬體配置
叢集節點硬體配置越高,如CPU核數和記憶體都很多,可以增大同時運行的map或reduce個數,如果單個節點硬體配置難以提升,可以增加叢集節點數。 因素4:SFTP引數配置 不使用壓縮、加密演算法優先選擇aes128-cbc,完整性校驗演算法優先選擇[email protected] 因素5:叢集引數配置 因素6:Linux檔案預讀值 設定磁碟檔案預讀值大小為16384,使用linux命令: echo 16384 > /sys/block/sda/queue/read_ahead_kb
說明:
sda表示當前磁碟的磁碟名。

  

12.2 Manager

12.2.1 提升Manager配置服務引數的效率

操作場景 在安裝叢集或者擴容節點以後,叢集中可能新增了較多數量的節點。此時如果系統管理員在FusionInsight Manager上修改服務引數、儲存新配置並重啟服務時,Manager的Controller程序可能佔用大量記憶體,增加了CPU工作負荷,使用者需要等待一段時間才能完成引數修改。系統管理員可以根據實際業務使用情況,手動增加Controller的JVM啟動引數中記憶體引數,提升配置服務引數的效率。 對系統的影響 該操作需要在主管理節點重新啟動Controller,重啟期間會造成FusionInsight Manager暫時中斷。備管理節點Controller無需重啟。 前提條件 已確認主備管理節點IP。 操作步驟 1. 使用PuTTY,以omm使用者登入主管理節點。 2. 執行以下命令,切換目錄。 cd ${BIGDATA_HOME}/om-server/om/sbin 3. 執行以下命令修改Controller啟動引數檔案“controller.sh”,並儲存退出。 vi controller.sh 修改配置項“JAVA_HEAP_MAX”的引數值。例如,叢集中包含了400個以上的節點,建議修改如下,表示Controller最大可使用8GB記憶體: JAVA_HEAP_MAX=-Xmx8192m 4. 執行以下命令,重新啟動Controller。 sh ${BIGDATA_HOME}/om-server/om/sbin/restart-controller.sh 提示以下資訊表示命令執行成功: End into start-controller.sh 執行sh ${BIGDATA_HOME}/om-server/om/sbin/status-oms.sh,檢視Controller的“ResHAStatus”是否為“Normal”,並可以重新登入FusionInsight Manager表示重啟成功。 5. 使用PuTTY,以omm使用者登入備管理節點,並重復步驟 2~步驟 3。

12.2.2 根據叢集節點數優化Manager配置

操作場景 FusionInsight叢集規模不同時,Manager相關引數差異較大。在叢集容量調整前或者安裝叢集時,使用者可以手動指定Manager叢集節點數,系統將自動調整相關程序參 數。
說明:
在安裝叢集時,可以通過Manager安裝配置檔案中的“cluster_nodes_scale”引數指定叢集節點數。

  

操作步驟 1. 使用PuTTY,以omm使用者登入主管理節點。 2. 執行以下命令,切換目錄。 cd ${BIGDATA_HOME}/om-server/om/sbin 3. 執行以下命令檢視當前叢集Manager相關配置。 sh oms_confifig_info.sh -q 4. 執行以下命令指定當前叢集的節點數。 命令格式:sh oms_confifig_info.sh -s 節點數 例如: sh oms_confifig_info.sh -s 10 根據介面提示,輸入“y”: The following confifigurations will be modifified: Module Parameter Current Target Controller controller.Xmx 4096m => 8192m Controller controller.Xms 1024m => 2048m ... Do you really want to do this operation? (y/n): 介面提示以下資訊表示配置更新成功: ... Operation has been completed. Now restarting OMS server. [done] Restarted oms server successfully.
說明:
配置更新過程中,OMS會自動重啟。
相近數量的節點規模對應的Manager相關配置是通用的,例如100節點變為101節點,並沒有新的配置項需要重新整理。

  

12.3 HBase

12.3.1 提升BulkLoad效率

操作場景 批量載入功能採用了MapReduce jobs直接生成符合HBase內部資料格式的檔案,然後把生成的StoreFiles檔案載入到正在運行的叢集。使用批量載入相比直接使用 HBase的API會節約更多的CPU和網路資源。 ImportTSV是一個HBase的表資料載入工具。 前提條件 在執行批量載入時需要通過“Dimporttsv.bulk.output”引數指定檔案的輸出路徑。 操作步驟 引數入口:執行批量載入任務時,在BulkLoad命令行中加入如下引數。 表12­1 增強BulkLoad效率的配置項

引數

描述

配置的值

-

Dimporttsv.mapper.class

使用者自定義mapper通過把鍵值對的構造從mapper移動到reducer以幫助提高效能。mapper只需要把每一行的原始文字傳送給reducer, reducer解析每一行的每一條記錄並建立鍵值對。

org.apache.hadoop.hbase.mapreduce.TsvImporterByteMapper 和org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper

說明:

當該值配置

為“org.apache.hadoop.hbase.mapreduce.TsvImporterByteMapper”時,只在執行沒有HBASE_CELL_VISIBILITY OR HBASE_CELL_TTL選項的批

量載入命令時使用。使

用“org.apache.hadoop.hbase.mapreduce.TsvImporterByteMapper”時可以得到更好的效能。

12.3.2 提升連續put場景效能

操作場景 對大批量、連續put的場景,配置下面的兩個引數為“false”時能大量提升效能。 “hbase.regionserver.wal.durable.sync” “hbase.regionserver.hfifile.durable.sync” 當提升效能時,缺點是對於DataNode(預設是3個)同時故障時,存在小概率資料丟失的現象。對資料可靠性要求高的場景請慎重配置。 操作步驟

引數

描述

預設值

hbase.regionserver.wal.durable.sync

每一條wal是否持久化到硬碟。

true

hbase.regionserver.hfile.durable.sync

hfile寫是否立即持久化到硬碟。

true

12.3.3 Put和Scan效能綜合調優

操作場景 HBase有很多與讀寫效能相關的配置引數。讀寫請求負載不同的情況下,配置引數需要進行相應的調整,本章節旨在指導使用者通過修改RegionServer配置引數進行讀寫 效能調優。 操作步驟 JVM GC引數 RegionServer GC_OPTS引數設定建議: -Xms與-Xmx設定相同的值,需要根據實際情況設定,增大記憶體可以提高讀寫效能,可以參考引數“hfifile.block.cache.size”(見表12-4)和引數“hbase.regionserver.global.memstore.size”(見表12-3)的介紹進行設定。 -XX:NewSize與-XX:MaxNewSize設定相同值,建議低負載場景下設定為“512M”,高負載場景下設定為“2048M”。 -XX:CMSInitiatingOccupancyFraction建議設定為“100 * (hfifile.block.cache.size + hbase.regionserver.global.memstore.size + 0.05)”,最大值不 超過90。-XX:MaxDirectMemorySize表示JVM使用的堆外記憶體,建議低負載情況下設定為“512M”,高負載情況下設定為“2048M”。 Put相關引數 RegionServer處理put請求的資料,會將資料寫入memstore和hlog, 當memstore大小達到設定的“hbase.hregion.memstore.flflush.size”引數值大小時,memstore就會重新整理到HDFS生成HFile。 噹噹前region的列簇的HFile數量達到“hbase.hstore.compaction.min”引數值時會觸發compaction。 噹噹前region的列簇HFile數達到“hbase.hstore.blockingStoreFiles”引數值時會阻塞memstore重新整理生成HFile的操作,導致put請求阻塞。

引數

描述

預設值

hbase.regionserver.wal.durable.sync

每一條wal是否持久化到硬碟。參考 升連續put場景效能

true

hbase.regionserver.hfile.durable.sync

hfile寫是否立即持久化到硬碟。參考 升連續put場景效能

true

hbase.hregion.memstore.flush.size

建議設定為HDFS塊大小的整數倍,在記憶體足夠put負載大情況下可以調整增大。單位:位元組。

134217728

hbase.regionserver.global.memstore.size

建議設定為“hbase.hregion.memstore.flush.size * 寫活躍 region數 / RegionServer GC -Xmx”。預設值為“0.4”,表示使用RegionServer GC -Xmx的40%。

0.4

hbase.hstore.flusher.count

memstore的flush執行緒數,在put高負載場景下可以適當調大。

2

hbase.regionserver.thread.compaction.small

HFile compaction執行緒數,在put高負載情況下可以適當調大。

10

hbase.hstore.blockingStoreFiles

當列簇的HFile數達到該閾值,阻塞該region的所有操作,直到compcation完成,在put高負載場景下可以適當調大。

15

引數

描述

預設值

hbase.client.scanner.timeout.period

客戶端和RegionServer端引數,表示scan租約的時間,建議設定為60000ms的整數倍,在讀高負載情況下可以適當調大。單位:毫秒。

60000

hfile.block.cache.size

資料快取所佔的RegionServer GC -Xmx百分比,在讀高負載情況下可以適當調大以增大快取命中率以提高效能。預設值為“0.25”,表示使用RegionServer GC -Xmx的25%。

0.25

引數

描述

預設值

hbase.regionserver.handler.count

RegionServer上的RPC服務器實例數,建議設定為200 ~ 400 之間。

200

hbase.regionserver.metahandler.count

RegionServer中處理優先請求的程式實例的數量,建議設定為200 ~ 400之間。

100

12.3.4 提升實時寫資料效率

操作場景 需要把資料實時寫入到HBase中或者對於大批量、連續put的場景。 前提條件 呼叫HBase的put或delete介面,把資料儲存到HBase中。 操作步驟 寫資料服務端調優 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > HBase > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­6 影響實時寫資料配置項

配置引數

描述

預設值


配置引數

描述

預設值

hbase.regionserver.wal.durable.sync

控制HLog檔案在寫入到HDFS時的同步程度。如果為true,HDFS在把資料寫入到硬碟後才返回;如果為false,HDFS在把資料寫入OS的快取後就返回。

把該值設定為false比true在寫入效能上會更優。

true

hbase.regionserver.hfile.durable.sync

控制HFile檔案在寫入到HDFS時的同步程度。如果為true,HDFS在把資料寫入到硬碟後才返回;如果為false,HDFS在把資料寫入OS的快取後就返回。

把該值設定為false比true在寫入效能上會更優。

true

GC_OPTS

HBase利用記憶體完成讀寫操作。提高HBase記憶體可以有效提高HBase 效能。GC_OPTS主要需要調整HeapSize的大小和NewSize的大小。調 整HeapSize大小的時候,建議將Xms和Xmx設定成相同的值,這樣可以避免JVM動態調整HeapSize大小的時候影響效能。調整NewSize大小的時候,建議把其設定為HeapSize大小的1/9。

HMaster:當HBase叢集規模越大、Region數量越多時,可以適當調大HMaster的GC_OPTS引數。RegionServer:RegionServer需要的記憶體一般比HMaster要

大。在記憶體充足的情況下,HeapSize可以相對設定大一些。

說明:

主HMaster的HeapSize為4G的時候,HBase叢集可以支援100000Region 數的規模。根據經驗值,單個RegionServer的HeapSize不建議超過20GB。

HMaster:

-Xms2G -Xmx2G - XX:NewSize=256M - XX:MaxNewSize=256M

RegionServer:

-Xms4G -Xmx4G - XX:NewSize=512M - XX:MaxNewSize=512M

hbase.regionserver.handler.count

表示RegionServer在同一時刻能夠併發處理多少請求。如果設定過高會導致激烈執行緒競爭,如果設定過小,請求將會在RegionServer長時間等待,降低處理能力。根據資源情況,適當增加處理執行緒數。

建議根據CPU的使用情況,可以選擇設定為100至300之間的值。

200

hbase.hregion.max.filesize

表示HBase中Region的檔案總大小的最大值。當Region中的檔案大於該引數時,將會導致Region分裂。 該引數設定過小時,可能會導致Split操作過於頻繁。當設定過大時,可能導致Compact需要處理的檔案大小增加,影響Compact執行效率。

10737418240(單位:位元組)

hbase.hregion.memstore.flush.size

在RegionServer中,當寫操作記憶體中存在超過memstore.flush.size大小的memstore,則MemStoreFlusher就啟動flush操作將該memstore 以hfile的形式寫入對應的store中。

如果RegionServer的記憶體充足,而且活躍Region數量也不是很多的時候,可以適當增大該值,可以減少compaction的次數,有助於提升系統性能。

同時,這種flush產生的時候,並不是緊急的flush,flush操作可能會有一定延遲,在延遲期間,寫操作還可以進行,Memstore還會繼續增大,最大值為“memstore.flush.size” * “hbase.hregion.memstore.block.multiplier”。當超過最大值時,將會阻塞操作。適當增大“hbase.hregion.memstore.block.multiplier”可以減少阻塞,減少效能波動。

134217728(單位:位元組)

hbase.regionserver.global.memstore.size

RegionServer中,負責flush操作的是MemStoreFlusher執行緒。該執行緒定期檢查寫操作記憶體,當寫操作佔用記憶體總量達到閾值, MemStoreFlusher將啟動flush操作,按照從大到小的順序,flush若干相對較大的memstore,直到所佔用記憶體小於閾值。

閾值 = “hbase.regionserver.global.memstore.size” * “hbase.regionserver.global.memstore.size.lower.limit” * “HBase_HEAPSIZE”

說明:

該配置與“hfile.block.cache.size”的和不能超過0.8,也就是寫和讀操作的記憶體不能超過HeapSize的80%,這樣可以保證除讀和寫外其它操作的正常運行。

0.4

hbase.hstore.blockingStoreFiles

在region flush前首先判斷file檔案個數,是否大於hbase.hstore.blockingStoreFiles。

如果大於需要先compaction並且讓flush延時90s(這個值可以通過hbase.hstore.blockingWaitTime進行配置),在延時過程中,將會繼續寫從而使得Memstore還會繼續增大超過最大值“memstore.flush.size” * “hbase.hregion.memstore.block.multiplier”,導致寫操作阻塞。當完成compaction後,可能就會產生大量寫入。這樣就導致效能激烈震盪。

增加hbase.hstore.blockingStoreFiles,可以減低BLOCK機率。

15

hbase.regionserver.thread.compaction.throttle

控制一次Minor Compaction時,進行compaction的檔案總大小的閾值。Compaction時的檔案總大小會影響這一次compaction的執行時間,如果太大,可能會阻塞其它的compaction或flush操作。

1610612736(單位:位元組)


配置引數

描述

預設值

hbase.hstore.compaction.min

當一個Store中檔案超過該值時,會進行compact,適當增大該值,可以減少檔案被重複執行compaction。但是如果過大,會導致Store中檔案數過多而影響讀取的效能。

6

hbase.hstore.compaction.max

控制一次compaction操作時的檔案數量的最大值。

與“hbase.hstore.compaction.max.size”的作用基本相同,主要是控制一次compaction操作的時間不要太長。

10

hbase.hstore.compaction.max.size

如果一個HFile檔案的大小大於該值,那麼在Minor Compaction操作中不會選擇這個檔案進行compaction操作,除非進行Major Compaction操作。

9223372036854775807(單

位:位元組)

這個值可以防止較大的HFile參與compaction操作。在禁止Major Compaction後,一個Store中可能存在幾個HFile,而不會合併成為一個HFile,這樣不會對資料讀取造成太大的效能影響。

hbase.hregion.majorcompaction

設定Major Compaction的執行週期。預設值為604800000毫秒。由於執行Major Compaction會佔用較多的系統資源,如果正在處於系統繁忙時期,會影響系統的效能。

如果業務沒有較多的更新、刪除、回收過期資料空間時,可以把該值設定為0,以禁止Major Compaction。

如果必須要執行Major Compaction,以回收更多的空間,可以適當增加該值,同時配置參

數“hbase.offpeak.end.hour”和“hbase.offpeak.start.hour”以控制 Major Compaction發生在業務空閒的時期。

604800000(單位:毫秒)

hbase.regionserver.maxlogs hbase.regionserver.hlog.blocksize

表示一個RegionServer上未進行Flush的Hlog的檔案數量的閾 值,如果大於該值,RegionServer會強制進行flush操作。

表示每個HLog檔案的最大大小。如果HLog檔案大小大於該值,就會滾動出一個新的HLog檔案,舊的將被禁用並歸檔。

這兩個引數共同決定了RegionServer中可以存在的未進行Flush的hlog 數量。當這個資料量小於MemStore的總大小的時候,會出現由於HLog檔案過多而觸發的強制flush操作。這個時候可以適當調整這兩個引數的大小,以避免出現這種強制flush的情況。

32

134217728(單位:位元組)

寫資料客戶端調優 寫資料時,在場景允許的情況下,最好使用Put List的方式,可以極大的提升寫效能。每一次Put的List的長度,需要結合單條Put的大小,以及實際環境的一些引數進行設定。建議在選定之前先做一些基礎的測試。 寫資料表設計調優 表12­7 影響實時寫資料相關引數

配置引數

描述

預設值

COMPRESSION

配置資料的壓縮演算法,這里的壓縮是HFile中block 級別的壓縮。對於可以壓縮的資料,配置壓縮演算法可以有效減少磁碟的IO,從而達到提高效能的目 的。

說明:

並非所有資料都可以進行有效壓縮。例如一張圖片的 資料,因為圖片一般已經是壓縮後的資料,所以壓縮 效果有限。 常用的壓縮演算法是SNAPPY,因為它有較好的Encoding/Decoding速度和可以接受的壓縮率。

NONE

BLOCKSIZE

配置HFile中block塊的大小,不同的block塊大小, 可以影響HBase讀寫資料的效率。越大的block

塊,配合壓縮演算法,壓縮的效率就越好;但是由於HBase的讀取資料是以block塊為單位的,所以越大的block塊,對於隨機讀的情況,效能可能會比較差。

如果要提升寫入的效能,一般擴大到128KB或者256KB,可以提升寫資料的效率,也不會影響太大的隨機讀效能。

65536(單位:位元組)

IN_MEMORY

配置這個表的資料優先快取在記憶體中,這樣可以有效提升讀取的效能。對於一些小表,而且需要頻繁進行讀取操作的,可以設定此配置項。

false

12.3.5 提升實時讀資料效率

操作場景 需要讀取HBase資料場景。 前提條件 呼叫HBase的get或scan介面,從HBase中實時讀取資料。 操作步驟 讀資料服務端調優 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > HBase > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­8 影響實時寫資料配置項

配置引數

描述

預設值

GC_OPTS

HBase利用記憶體完成讀寫操作。提高HBase記憶體可以有效提高HBase效能。

HMaster:

-Xms2G -Xmx2G - XX:NewSize=256M - XX:MaxNewSize=256M RegionServer:

-Xms4G -Xmx4G - XX:NewSize=512M - XX:MaxNewSize=512M

GC_OPTS主要需要調整HeapSize的大小和NewSize的大小。調整HeapSize大小的時候,建議將Xms和Xmx設定成相同的值,這樣可以避免JVM動態調整HeapSize大小的時候影響效能。調整NewSize大小的時候,建議把其設定為HeapSize大小的1/9。

HMaster:當HBase叢集規模越大、Region數量越多時,可以適當調大HMaster的GC_OPTS引數。

RegionServer:RegionServer需要的記憶體一般比HMaster要 大。在記憶體充足的情況下,HeapSize可以相對設定大一些。

說明:

主HMaster的HeapSize為4G的時候,HBase叢集可以支援100000Region 數的規模。根據經驗值,單個RegionServer的HeapSize不建議超過20GB。

hbase.regionserver.handler.count

表示RegionServer在同一時刻能夠併發處理多少請求。如果設定過高會導致激烈執行緒競爭,如果設定過小,請求將會在RegionServer長時間等待,降低處理能力。根據資源情況,適當增加處理執行緒數。

建議根據CPU的使用情況,可以選擇設定為100至300之間的值。

200

hfile.block.cache.size

HBase快取區大小,主要影響查詢效能。根據查詢模式以及查詢記錄分佈情況來決定快取區的大小。如果採用隨機查詢使得快取區的命中率較低,可以適當降低快取區大小。

0.25

說明: 如果同時存在讀和寫的操作,這兩種操作的效能會互相影響。如果寫入導致的flush和Compaction操作頻繁發生,會佔用大量的磁碟IO操作,
從而影響 讀取的效能。如果寫入導致阻塞較多的Compaction操作,就會出現Region中存在多個HFile的情況,從而影響讀取的效能。
所以如果讀取的效能不理 想的時候,也要考慮寫入的配置是否合理。
讀資料客戶端調優 Scan資料時需要設定caching(一次從服務端讀取的記錄條數,預設是1),若使用預設值讀效能會降到極低。 當不需要讀一條資料所有的列時,需要指定讀取的列,以減少網路IO。 只讀取RowKey時,可以為Scan新增一個只讀取RowKey的fifilter(FirstKeyOnlyFilter或KeyOnlyFilter)。 讀資料表設計調優 表12­9 影響實時讀資料相關引數

配置引數

描述

預設值

COMPRESSION

配置資料的壓縮演算法,這里的壓縮是HFile中block 級別的壓縮。對於可以壓縮的資料,配置壓縮演算法可以有效減少磁碟的IO,從而達到提高效能的目 的。

說明:

並非所有資料都可以進行有效壓縮。例如一張圖片的 資料,因為圖片一般已經是壓縮後的資料,所以壓縮 效果有限。 常用的壓縮演算法是SNAPPY,因為它有較好的Encoding/Decoding速度和可以接受的壓縮率。

NONE

BLOCKSIZE

配置HFile中block塊的大小,不同的block塊大小, 可以影響HBase讀寫資料的效率。越大的block

塊,配合壓縮演算法,壓縮的效率就越好;但是由於HBase的讀取資料是以block塊為單位的,所以越大的block塊,對於隨機讀的情況,效能可能會比較差。

如果要提升寫入的效能,一般擴大到128KB或者256KB,可以提升寫資料的效率,也不會影響太大的隨機讀效能。

65536(單位:位元組)

DATA_BLOCK_ENCODING

配置HFile中block塊的編碼方法。當一行資料中存在多列時,一般可以配置為“FAST_DIFF”,可以有效的節省資料儲存的空間,從而提供效能。

NONE

12.3.6 JVM引數優化操作場景

當叢集資料量達到一定規模後,JVM的預設配置將無法滿足叢集的業務需求,輕則叢集變慢,重則叢集服務不可用。所以需要根據實際的業務情況進行合理的JVM引數 配置,提高叢集效能。 操作步驟 引數入口: HBase角色相關的JVM引數需要配置在“${HBASE_HOME}/conf”目錄下的“hbase-env.sh”檔案中。 每個角色都有各自的JVM引數配置變量,如表12-10。 表12­10 HBase相關JVM引數配置變量

變量名

變量影響的角色

HBASE_OPTS

該變量中設定的引數,將影響HBase的所有角色。

SERVER_GC_OPTS

該變量中設定的引數,將影響HBase Server端的所有角色,例如:Master、RegionServer 等。

CLIENT_GC_OPTS

該變量中設定的引數,將影響HBase的Client程序。

HBASE_MASTER_OPTS

該變量中設定的引數,將影響HBase的Master。

HBASE_REGIONSERVER_OPTS

該變量中設定的引數,將影響HBase的RegionServer。

HBASE_THRIFT_OPTS

該變量中設定的引數,將影響HBase的Thrift。

12.4 HDFS

12.4.1 提升寫效能

操作場景 在HDFS中,通過調整屬性的值,使得HDFS叢集更適應自身的業務情況,從而提升HDFS的寫效能。 操作步驟 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > HDFS > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­11 HDFS寫效能優化配置

引數

描述

預設值

dfs.datanode.drop.cache.behind.reads

設定為true表示丟棄快取的資料(需要在DataNode中配置)。

當同一份資料,重複讀取的次數較少時,建議設定為true,使得快取能夠被其他操作使用。重複讀取的次數較多時,設定為false能夠提升重複讀取的速度。

true

dfs.client-write-packet-size

當HDFS Client往DataNode寫資料時,將資料生成一個包。然後將這個包在網路上傳出。此引數指定傳輸資料包的大小,可以通過各Job來指定。單位:位元組。

在萬兆網部署下,可適當增大該引數值,來提升傳輸的吞吐量。

262144

12.4.2 JVM引數優化

操作場景 當叢集資料量達到一定規模後,JVM的預設配置將無法滿足叢集的業務需求,輕則叢集變慢,重則叢集服務不可用。所以需要根據實際的業務情況進行合理的JVM引數 配置,提高叢集效能。 操作步驟 引數入口: HDFS角色相關的JVM引數需要配置在“${HADOOP_HOME}/etc/hadoop”目錄下的“hadoop-env.sh”檔案中。 JVM各引數的含義請參見其官網:http://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html 每個角色都有各自的JVM引數配置變量,如表12-12。 表12­12 HDFS相關JVM引數配置變量

變量名

變量影響的角色

HADOOP_OPTS

該變量中設定的引數,將影響HDFS的所有角色。

HADOOP_NAMENODE_OPTS

該變量中設定的引數,將影響HDFS的NameNode。

HADOOP_DATANODE_OPTS

該變量中設定的引數,將影響HDFS的DataNode。

HADOOP_JOURNALNODE_OPTS

該變量中設定的引數,將影響HDFS的JournalNode。

變量名

變量影響的角色

HADOOP_ZKFC_OPTS

該變量中設定的引數,將影響HDFS的ZKFC。

HADOOP_SECONDARYNAMENODE_OPTS

該變量中設定的引數,將影響HDFS的SecondaryNameNode。

HADOOP_CLIENT_OPTS

該變量中設定的引數,將影響HDFS的Client程序。

HADOOP_BALANCER_OPTS

該變量中設定的引數,將影響HDFS的Balancer程序。

HADOOP_MOVER_OPTS

該變量中設定的引數,將影響HDFS的Mover程序。

配置方式舉例: export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,N

12.4.3 使用客戶端元資料快取提高讀取效能

操作場景 通過使用客戶端快取元資料塊的位置來提高HDFS讀取效能。
說明:
此功能僅用於讀取不經常修改的檔案。因為在服務器端由某些其他客戶端完成的資料修改,對於快取記憶體的客戶端將是不可見的,
這可能導致從快取中拿到的元資料是過期的。 
操作步驟 設定引數的路徑: 在FusionInsight Manager頁面中,選擇“服務管理 > HDFS > 服務配置”,將“引數類別”設定為“全部配置”,並在搜尋框中輸入引數名稱。 表12­13 引數配置

引數

描述

預設值

dfs.client.metadata.cache.enabled

啟用/禁用塊位置元資料的客戶端快取。將此引數設定為“true”,搭配“dfs.client.metadata.cache.pattern”引數以啟用快取。

false

dfs.client.metadata.cache.pattern

需要快取的檔案路徑的正則表示式模式。只有這些檔案的塊位置元資料被快取,直到這些元資料過期。此配置僅在引數“dfs.client.metadata.cache.enabled”設定為“true”時有效。

示例:“/test.*”表示讀取其路徑是以“/test”開頭的所有檔案。

說明:

為確保一致性,配置特定模式以僅快取其他客戶端不經常修改的檔案。

正則表示式模式將僅驗證URI的path部分,而不驗證在Fully Qualified路徑情況下的schema和authority。

<empty>

dfs.client.metadata.cache.expiry.sec

快取元資料的持續時間。快取條目在該持續時間過期後失效。即使在快取過程中經常使用的元數 據也會發生失效。

配置值可採用時間字尾s/m/h表示,分別表示秒,分鐘和小時。

說明:

若將該引數配置為“0s”,將禁用快取功能。

60s

dfs.client.metadata.cache.max.entries

快取一次最多可儲存的非過期資料條目。

65536

說明:
要在過期前完全清除客戶端快取,可呼叫DFSClient#clearLocatedBlockCache()。
用法如下所示。
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem dfs = (DistributedFileSystem) fs;
DFSClient dfsClient = dfs.getClient();
dfsClient.clearLocatedBlockCache();

  

12.4.4 使用當前活動快取提升客戶端與NameNode的連線效能

操作場景 HDFS部署在具有多個NameNode實例的HA(High Availability)模式中,HDFS客戶端需要依次連線到每個NameNode,以確定當前活動的NameNode是什麼,並將其用於客戶端操作。 一旦識別出來,當前活動的NameNode的詳細資訊就可以被快取並共享給在客戶端機器中運行的所有客戶端。這樣,每個新客戶端可以首先嚐試從快取載入活動的 Name Node的詳細資訊,並將RPC呼叫儲存到備用的NameNode。在異常情況下有很多優勢,例如當備用的NameNode連線長時間不響應時。 當發生故障,將另一個NameNode切換為活動狀態時,快取的詳細資訊將被更新為當前活動的NameNode的資訊。 操作步驟 設定引數的路徑如下:在FusionInsight Manager頁面中,選擇“服務管理 > HDFS > 服務配置”,將“引數類別”設定為“全部配置”,並在搜尋框中輸入引數名稱。 表12­14 配置引數

引數

描述

預設值

dfs.client.failover.proxy.provider.[nameservice ID]

配置客戶端Failover proxy provider類,該類使用傳遞的協議建立NameNode proxy。該引數可以被配置

為“org.apache.hadoop.hdfs.server.namenode.ha.BlackListingFailoverProxyProvider”或者“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”。

org.apache.hadoop.hd

dfs.client.failover.activeinfo.share.flag

啟用快取並將當前活動的NameNode的詳細資訊共享給其他客戶端。若要啟用快取,需將其設定為“true”。

false

dfs.client.failover.activeinfo.share.path

指定將在機器中的所有客戶端建立的共享檔案的本地目錄。如果要為不同用戶共享快取, 該資料夾應具有必需的許可權(如在給定目錄中建立,讀寫快取檔案)。

/tmp

dfs.client.failover.activeinfo.share.io.timeout.sec

控制超時的可選配置。用於在讀取或寫入快取檔案時獲取鎖定。如果在該時間內無法獲取 快取檔案上的鎖定,則放棄嘗試讀取或更新快取。單位為秒。

5

說明:
由HDFS客戶端建立的快取檔案必須由其他客戶端重新使用。因此,這些檔案永遠不會從本地系統中刪除。若禁用該功能,可能需要進行手動清理。  

12.5 Hive

12.5.1 建立表分割槽

操作場景 Hive在做Select查詢時,一般會掃描整個表內容,會消耗較多時間去掃描不關注的資料。此時,可根據業務需求及其查詢維度,建立合理的表分割槽,從而提高查詢效 率。 操作步驟 1. 使用PuTTY工具,以root使用者登入已安裝Hive客戶端的節點。 2. 執行以下命令,進入客戶端安裝目錄,例如“/opt/client”。 cd /opt/client 3. 執行source bigdata_env命令,配置客戶端環境變量。 4. 在客戶端中執行如下命令,執行登入操作。 kinit 使用者名稱 5. 執行以下命令登入客戶端工具。 beeline 6. 指定靜態分割槽或者動態分割槽。 靜態分割槽: 靜態分割槽是手動輸入分割槽名稱,在建立表時使用關鍵字PARTITIONED BY指定分割槽列名及資料型別。應用開發時,使用ALTER TABLE ADD PARTITION語句增加分割槽,以及使用LOAD DATA INTO PARTITON語句將資料載入到分割槽時,只能靜態分割槽。 動態分割槽:通過查詢命令,將結果插入到某個表的分割槽時,可以使用動態分割槽。 動態分割槽通過在客戶端工具執行如下命令來開啟: set hive.exec.dynamic.partition=true 動態分割槽預設模式是strict,也就是必須至少指定一列為靜態分割槽,在靜態分割槽下建立動態子分割槽,可以通過如下設定來開啟完全的動態分割槽: set hive.exec.dynamic.partition.mode=nonstrict
說明:
1. 動態分割槽可能導致一個DML語句建立大量的分割槽,對應的建立大量新資料夾,對系統性能可能帶來影響。
2. 在檔案數量大的情況下,執行一個SQL語句啟動時間較長,可以在執行SQL語句之前執行“set mapreduce.input.fifileinputformat.list-status.num
threads = 100;”語句來縮短啟動時間。“mapreduce.input.fifileinputformat.list-status.num-threads”引數需要先新增到Hive的白名單才可設定。 

12.5.2 Join優化

操作場景 使用Join語句時,如果資料量大,可能造成命令執行速度和查詢速度慢,此時可進行Join優化。 Join優化可分為以下方式: Map Join Sort Merge Bucket Map Join Join順序優化 Map Join Hive的Map Join適用於能夠在記憶體中存放下的小表(指表大小小於25MB),通過“hive.mapjoin.smalltable.fifilesize”定義小表的大小,預設為25MB。Map Join的方法有兩種: 使用/*+ MAPJOIN(join_table) */。 執行語句前設定如下引數,當前版本中該值預設為true。 set hive.auto.convert.join=true 使用Map Join時沒有Reduce任務,而是在Map任務前起了一個MapReduce Local Task,這個Task通過TableScan讀取小表內容到本機,在本機以HashTable的形式 儲存並寫入硬碟上傳到DFS,並在distributed cache中儲存,在Map Task中從本地磁碟或者distributed cache中讀取小表內容直接與大表join得到結果並輸出。 使用Map Join時需要注意小表不能過大,如果小表將記憶體基本用盡,會使整個系統性能下降甚至出現記憶體溢位的異常。 Sort Merge Bucket Map Join 使用Sort Merge Bucket Map Join必須滿足以下2個條件: 1. join的兩張表都很大,記憶體中無法存放。 2. 兩張表都按照join key進行分桶(clustered by (column))和排序(sorted by(column)),且兩張表的分桶數正好是倍數關係。 通過如下設定,啟用Sort Merge Bucket Map Join: set hive.optimize.bucketmapjoin=true set hive.optimize.bucketmapjoin.sortedmerge=true 這種Map Join也沒有Reduce任務,是在Map任務前啟動MapReduce Local Task,將小表內容按桶讀取到本地,在本機儲存多個桶的HashTable備份並寫入HDFS,並儲存在Distributed Cache中,在Map Task中從本地磁碟或者Distributed Cache中按桶一個一個讀取小表內容,然後與大表做匹配直接得到結果並輸出。 Join順序優化 當有3張及以上的表進行Join時,選擇不同的Join順序,執行時間存在較大差異。使用恰當的Join順序可以有效縮短任務執行時間。 Join順序原則: Join出來結果較小的組合,例如表資料量小或兩張表Join後產生結果較少,優先執行。 Join出來結果大的組合,例如表資料量大或兩張表Join後產生結果較多,在後面執行。 例如,customer表的資料量最多,orders表和lineitem表優先Join可獲得較少的中間結果。 原有的Join語句如下:
select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-22'
and l_shipdate > '1995-03-22'
limit 10;
Join順序優化後如下:
select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
orders,
lineitem,
customer
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < '1995-03-22'
and l_shipdate > '1995-03-22'
limit 10;

  

注意事項 Join資料傾斜問題 執行任務的時候,任務進度長時間維持在99%,這種現象叫資料傾斜。 資料傾斜是經常存在的,因為有少量的Reduce任務分配到的資料量和其他Reduce差異過大,導致大部分Reduce都已完成任務,但少量Reduce任務還沒完成的情況。 解決資料傾斜的問題,可通過設定set hive.optimize.skewjoin=true並調整hive.skewjoin.key的大小。hive.skewjoin.key是指Reduce端接收到多少個key即認為資料是 傾斜的,並自動分發到多個Reduce。

12.5.3 Group By優化操作場景

優化Group by語句,可提升命令執行速度和查詢速度。 Group by的時候, Map端會先進行分組, 分組完後分發到Reduce端, Reduce端再進行分組。可採用Map端聚合的方式來進行Group by優化,開啟Map端初步聚 合,減少Map的輸出資料量。 操作步驟 在Hive客戶端進行如下設定: set hive.map.aggr=true 注意事項 Group By資料傾斜 Group By也同樣存在資料傾斜的問題,設定hive.groupby.skewindata為true,生成的查詢計劃會有兩個MapReduce Job,第一個Job的Map輸出結果會隨機的分佈到 Reduce中,每個Reduce做聚合操作,並輸出結果,這樣的處理會使相同的Group By Key可能被分發到不同的Reduce中,從而達到負載均衡,第二個Job再根據預處 理的結果按照Group By Key分發到Reduce中完成最終的聚合操作。 Count Distinct聚合問題 當使用聚合函式count distinct完成去重計數時,處理值為空的情況會使Reduce產生很嚴重的資料傾斜,可以將空值單獨處理,如果是計算count distinct,可以通過 where字句將該值排除掉,並在最後的count distinct結果中加1。如果還有其他計算,可以先將值為空的記錄單獨處理,再和其他計算結果合併。 12.5.4 資料儲存優化 操作場景 “ORC”是一種高效的列儲存格式,在壓縮比和讀取效率上優於其他檔案格式。 建議使用“ORC”作為Hive表預設的儲存格式。 前提條件 已登入Hive客戶端,具體操作請參見《管理員指南》的“使用Hive客戶端”。 操作步驟 推薦:使用“SNAPPY”壓縮,適用於壓縮比和讀取效率要求均衡場景。 Create table xx stored as orc tblproperties ("orc.compress"="SNAPPY") 可用:使用“ZLIB”壓縮,適用於壓縮比要求較高場景。 Create table xx stored as orc tblproperties ("orc.compress"="ZLIB") 說明: xx為具體使用的Hive表名。 12.5.5 SQL優化 操作場景 在Hive上執行SQL語句查詢時,如果語句中存在“(a&b) or (a&c)”邏輯時,建議將邏輯改為“a & (b or c)”。 樣例 假設條件a為“p_partkey = l_partkey”,優化前樣例如下所示: select sum(l_extendedprice* (1 - l_discount)) as revenue from lineitem, part where ( p_partkey = l_partkey and p_brand = 'Brand#32' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and l_quantity >= 7 and l_quantity <= 7 + 10 and p_size between 1 and 5 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_partkey = l_partkey and p_brand = 'Brand#35' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and l_quantity >= 15 and l_quantity <= 15 + 10 and p_size between 1 and 10 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_partkey = l_partkeyand p_brand = 'Brand#24' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and l_quantity >= 26 and l_quantity <= 26 + 10 and p_size between 1 and 15 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) 優化後樣例如下所示: select sum(l_extendedprice* (1 - l_discount)) as revenue from lineitem, part where p_partkey = l_partkey and (( p_brand = 'Brand#32' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and l_quantity >= 7 and l_quantity <= 7 + 10 and p_size between 1 and 5 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_brand = 'Brand#35' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and l_quantity >= 15 and l_quantity <= 15 + 10 and p_size between 1 and 10 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' ) or ( p_brand = 'Brand#24' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and l_quantity >= 26 and l_quantity <= 26 + 10 and p_size between 1 and 15 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON' )) 12.5.6 使用Hive CBO優化查詢 操作場景 在Hive中執行多表Join時,Hive支援開啟CBO(Cost Based Optimization),系統會自動根據表的統計資訊,例如資料量、檔案數等,選出最優計劃提高多表Join的 效率。Hive需要先收集表的統計資訊後才能使CBO正確的優化。 說明: CBO優化器會基於統計資訊和查詢條件,儘可能地使join順序達到最優。但是也可能存在特殊情況導致join順序調整不準確。例如資料存在傾斜,以及 查詢條件值在表中不存在等場景,可能調整出非優化的join順序。 開啟列統計資訊自動收集時,需要在reduce側做聚合統計。對於沒有reduce階段的insert任務,將會多出reduce階段,用於收集統計資訊。 前提條件 已登入Hive客戶端,具體操作請參見《管理員指南》的“使用Hive客戶端”。 操作步驟 1. 在Manager介面Hive元件的服務配置中搜索“hive.cbo.enable”引數,選中“true”永久開啟功能或者通過以下命令臨時開啟功能: set hive.cbo.enable=true; 2. 手動收集Hive表已有資料的統計資訊。 執行以下命令,可以手動收集統計資訊。僅支援統計一張表,如果需要統計不同的表需重複執行。 ANALYZE TABLE [db_name.]tablename [PARTITION(partcol1[=val1], partcol2[=val2], ...)] COMPUTE STATISTICS [FOR COLUMNS] [NOSCAN]; 說明: 指定FOR COLUMNS時,收集列級別的統計資訊。 指定NOSCAN時,將只統計檔案大小和個數,不掃描具體檔案。 例如: analyze table table_name compute statistics; analyze table table_name compute statistics for columns;3. 配置Hive自動收集統計資訊。開啟配置後,執行insert overwrite/into命令插入資料時才自動統計新資料的資訊。 在Hive客戶端執行以下命令臨時開啟收集: set hive.stats.autogather = true; 開啟表/分割槽級別的統計資訊自動收集。 set hive.stats.column.autogather = true; 開啟列級別的統計資訊自動收集。 說明: 列級別統計資訊的收集不支援複雜的資料型別,例如Map,Struct等。 表級別統計資訊的自動收集不支援Hive on HBase表。 在Manager介面Hive的服務配置中,搜尋引數“hive.stats.autogather”和“hive.stats.column.autogather”,選中“true”永久開啟收集功能。 4. 執行以下命令可以檢視統計資訊。 DESCRIBE FORMATTED table_name[.column_name] PARTITION partition_spec; 例如: desc formatted table_name; desc formatted table_name.id; desc formatted table_name.id partition(time='2016-05-27'); 說明: 分割槽表僅支援分割槽級別的統計資訊收集,因此分割槽表需要指定分割槽來查詢統計資訊。 12.6 Kafka 12.6.1 Kafka效能調優 操作場景 通過調整Kafka服務端引數,可以提升特定業務場景下Kafka的處理能力。 引數入口:在FusionInsight Manager系統中,選擇“服務管理 > Kafka > 服務配置”,“引數類別”設定為“全部配置”。在搜尋框中輸入引數名稱。 引數調優 表12­15 調優引數 配置引數 預設值 調優場景 num.recovery.threads.per.data.dir 10 在Kafka啟動過程中,資料量較大情況下,可調大此引數,可以提升啟動 速度。 background.threads 10 Broker後臺任務處理的執行緒數目。資料量較大的情況下,可適當調大此 引數,以提升Broker處理能力。 num.replica.fetchers 1 副本向Leader請求同步資料的執行緒數,增大這個數值會增加副本的I/O並 發度。 num.io.threads 8 Broker用來處理磁碟I/O的執行緒數目,這個執行緒數目建議至少等於硬碟的 個數。 KAFKA_HEAP_OPTS -Xmx6G Kafka JVM堆記憶體設定。當Broker上資料量較大時,應適當調整堆記憶體 大小。 12.7 MapReduce 12.7.1 多CPU核心下的調優配置 操作場景 當CPU核心數很多時,如CPU核心為磁碟數的3倍時的調優配置。 操作步驟 以下引數有如下兩個配置入口: 服務器端配置 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 客戶端配置 直接在客戶端中修改相應的配置檔案。 說明: HDFS客戶端配置檔案路徑:客戶端安裝目錄/HDFS/hadoop/etc/hadoop/hdfs-site.xml。 Yarn客戶端配置檔案路徑:客戶端安裝目錄/HDFS/hadoop/etc/hadoop/yarn-site.xml。MapReduce客戶端配置檔案路徑:客戶端安裝目錄/HDFS/hadoop/etc/hadoop/mapred-site.xml。 表12­16 多CPU核心設定 配置 描述 引數 預設值 Server/Client 節點 容器 槽位 數 如下配置組合決定了每節點任務(map、reduce)的 併發數。 “yarn.nodemanager.resource.memory- mb” “mapreduce.map.memory.mb” “mapreduce.reduce.memory.mb” yarn.nodemanager.resource.memory- mb 說明: 需要在FusionInsight Manager系統進行配 置。 8192 Server mapreduce.map.memory.mb 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/mapred- site.xml。 4096 Client mapreduce.reduce.memory.mb 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/mapred- site.xml。 4096 Client Map 輸出 與壓 縮 Map任務所產生的輸出可以在寫入磁碟之前被壓 縮,這樣可以節約磁碟空間並得到更快的寫盤速 度,同時可以減少至Reducer的資料傳輸量。需要 在客戶端進行配置。 mapreduce.map.output.compress指定了 Map任務輸出結果可以在網路傳輸前被壓 縮。這是一個per-job的配置。 mapreduce.map.output.compress.codec 指定用於壓縮的編解碼器。 mapreduce.map.output.compress 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/mapred- site.xml。 true Client mapreduce.map.output.compress.codec 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/mapred- site.xml。 org.apache.hadoop.io.compress.SnappyCodec Client Spills mapreduce.map.sort.spill.percent mapreduce.map.sort.spill.percent 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/mapred- site.xml。 0.8 Client 資料 包大 小 當HDFS客戶端寫資料至資料節點時,資料會被累 積,直到形成一個包。然後這個資料包會通過網路 傳輸。dfs.client-write-packet-size配置項可以指 定該資料包的大小。這個可以通過每個job進行指 定。 dfs.client-write-packet-size 說明: 需要在客戶端進行配置,配置檔案路徑:客 戶端安裝目 錄/HDFS/hadoop/etc/hadoop/hdfs- site.xml。 262144 Client 12.7.2 確定Job基線 操作場景 確定Job基線是調優的基礎,一切調優項效果的檢查,都是通過和基線資料做對比來獲得。 Job基線的確定有如下三個原則: 充分利用叢集資源 reduce階段盡量放在一輪 每個task的執行時間要合理 操作步驟 原則一:充分利用叢集資源。Job運行時,會讓所有的節點都有任務處理,且處於繁忙狀態,這樣才能保證資源充分利用,任務的併發度達到最大。可以通過調整處理的資料量大小,以及 調整map和reduce個數來實現。 Reduce個數的控制使用“mapreduce.job.reduces”。 Map個數取決於使用了哪種InputFormat,以及待處理的資料檔案是否可分割。預設的TextFileInputFormat將根據block的個數來分配map數(一個block一個 map)。通過如下配置引數進行調整。 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 引數 描述 預設值 mapreduce.input.fifileinputformat.split.maxsize 可以設定資料分片的資料最大值。 由使用者定義的分片大小的設定及每個檔案block大小的設定,可以計算分 片的大小。計算公式如下: splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 如果maxSize設定大於blockSize,那麼每個block就是一個分片,否則 就會將一個block檔案分隔為多個分片,如果block中剩下的一小段資料 量小於splitSize,還是認為它是獨立的分片。 - mapreduce.input.fifileinputformat.split.minsize 可以設定資料分片的資料最小值。 0 原則二:控制reduce階段在一輪中完成。 避免以下兩種場景: 大部分的reduce在第一輪運行完後,剩下唯一一個reduce繼續運行。這種情況下,這個reduce的執行時間將極大影響這個job的運行時間。因此需要 將reduce個數減少。 所有的map運行完後,只有個別節點有reduce在運行。這時候叢集資源沒有得到充分利用,需要增加reduce的個數以便每個節點都有任務處理。 原則三:每個task的執行時間要合理。 如果一個job,每個map或reduce的執行時間只有幾秒鐘,就意味著這個job的大部分時間都消耗在task的排程和程序啟停上了,因此需要增加每個task處理的 資料大小。建議一個task處理時間為1分鐘。 控制單個task處理時間的大小,可以通過如下配置來調整。 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 引數 描述 預設值 mapreduce.input.fifileinputformat.split.maxsize 可以設定資料分片的資料最大值。 由使用者定義的分片大小的設定及每個檔案block大小的設定,可以計算分 片的大小。計算公式如下: splitSize = Math.max(minSize, Math.min(maxSize, blockSize)) 如果maxSize設定大於blockSize,那麼每個block就是一個分片,否則 就會將一個block檔案分隔為多個分片,如果block中剩下的一小段資料 量小於splitSize,還是認為它是獨立的分片。 - mapreduce.input.fifileinputformat.split.minsize 可以設定資料分片的資料最小值。 0 12.7.3 Shufflfflffle調優 操作場景 Shufflfflffle階段是MapReduce效能的關鍵部分,包括了從Map task將中間資料寫到磁碟一直到Reduce task拷貝資料並最終放到reduce函式的全部過程。這一塊Hadoop 提供了大量的調優引數。 圖12­1 Shuffle過程 操作步驟 1. Map階段的調優 判斷Map使用的記憶體大小 判斷Map分配的記憶體是否足夠,一個簡單的辦法是檢視運行完成的job的Counters中,對應的task是否發生過多次GC,以及GC時間佔總task運行時 間之比。通常,GC時間不應超過task運行時間的10%,即GC time elapsed (ms)/CPU time spent (ms)<10%。 主要通過如下引數進行調整。引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > Yarn > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­17 引數說明 引數 描述 預設值 mapreduce.map.memory.mb 設定Map排程記憶體。 4096 mapreduce.map.java.opts 設定Map程序JVM引數。 -Xmx2048M - Djava.net.preferIPv4Stack=true 建議:配置“mapreduce.map.java.opts”引數中“-Xmx”值為“mapreduce.map.memory.mb”引數值的0.8倍。 使用Combiner 在Map階段,有一個可選過程,將同一個key值的中間結果合併,叫做combiner。一般將reduce類設定為combiner即可。通過combine,一般情況 下可以顯著減少Map輸出的中間結果,從而減少shufflfflffle過程的網路頻寬佔用。可通過如下介面為一個任務設定Combiner類。 表12­18 Combiner設定介面 類名 介面名 描述 org.apache.hadoop.mapreduce.Job public void setCombinerClass(Class<? extends Reducer> cls) 為Job設定一個combiner 類。 2. Copy階段的調優 資料是否壓縮 對Map的中間結果進行壓縮,當資料量大時,會顯著減少網路傳輸的資料量,但是也因為多了壓縮和解壓,帶來了更多的CPU消耗。因此需要做好 權衡。當任務屬於網路瓶頸型別時,壓縮Map中間結果效果明顯。針對bulkload調優,壓縮中間結果後效能提升60%左右。 配置方法:將“mapreduce.map.output.compress”引數值設定為“true”,將“mapreduce.map.output.compress.codec”引數值設定 為“org.apache.hadoop.io.compress.SnappyCodec”。 3. Merge階段的調優 通過調整如下引數減少reduce寫磁碟的次數。 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­19 引數說明 引數 描述 預設值 mapreduce.reduce.merge.inmem.threshold 允許多少個檔案同時存在reduce記憶體里。當達到這個閾值時, reduce就會觸發mergeAndSpill,將資料寫到硬碟上。 1000 mapreduce.reduce.shufflfflffle.merge.percent 當reduce中存放map中間結果的buffffer使用達到多少百分比時,會 觸發merge操作。 0.66 mapreduce.reduce.shufflfflffle.input.buffffer.percent 允許map中間結果佔用reduce堆大小的百分比。 0.70 mapreduce.reduce.input.buffffer.percent 當開始執行reduce函式時,允許map檔案佔reduce堆大小的百分 比。 當map檔案比較小時,可以將這個值設定成1.0,這樣可以避免 reduce將拷貝過來的map中間結果寫磁碟。 0 12.7.4 大任務的AM調優 操作場景 任務場景:運行的一個大任務(map總數達到了10萬的規模),但是一直沒有跑成功。經過查詢,發現是ApplicationMaster(以下簡稱AM)反應緩慢,最終超時失 敗。 此任務的問題是,task數量變多時,AM管理的物件也線性增長,因此就需要更多的記憶體來管理。AM預設分配的記憶體堆大小是1GB。 操作步驟 通過調大如下的引數來進行AM調優。 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > Yarn > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 引數 描述 預設值 yarn.app.mapreduce.am.resource.mb 該引數值必須大於下面引數的堆大小。單位: MB 1536 yarn.app.mapreduce.am.command- opts 傳遞到MapReduce ApplicationMaster的 JVM啟動引數。 -Xmx1024m -XX:CMSFullGCsBeforeCompaction=1 - XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled - XX:+UseCMSCompactAtFullCollection -verbose:gc 12.7.5 推測執行操作場景 當叢集規模很大時(如幾百上千臺節點的叢集),個別機器出現軟硬體故障的概率就變大了,並且會因此延長整個任務的執行時間(跑完的任務都在等出問題的機器跑 結束)。推測執行通過將一個task分給多臺機器跑,取先運行完的那個,會很好的解決這個問題。對於小叢集,可以將這個功能關閉。 操作步驟 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 引數 描述 預設值 mapreduce.map.speculative 是否開啟map的推測執行。true表示開啟。 false mapreduce.reduce.speculative 是否開啟reduce的推測執行。true表示開啟。 false 12.7.6 通過“Slow Start”調優 操作場景 Slow Start特性指定Map任務完成度為多少時Reduce任務可以啟動,過早啟動Reduce任務會導致資源佔用,影響任務運行效率,但適當的提早啟動Reduce任務會提高 Shufflfflffle階段的資源利用率,提高任務運行效率。例如:某叢集可啟動10個Map任務,MapReduce作業共15個Map任務,那麼在一輪Map任務執行完成後只剩5個Map 任務,叢集還有剩餘資源,在這種場景下,配置Slow Start引數值小於1,比如0.8,則Reduce就可以利用叢集剩餘資源。 操作步驟 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > MapReduce > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 引數 描述 預設值 mapreduce.job.reduce.slowstart.completedmaps 當多少佔比的Map執行完後開始執行Reduce。預設100%的Map跑完後開始 起Reduce。 1 12.7.7 MR job commit階段優化 操作場景 預設情況下,如果一個MR任務會產生大量的輸出結果檔案,那麼該job在最後的commit階段會耗費較長的時間將每個task的臨時輸出結果commit到最終的結果輸出目 錄。特別是在大叢集中,大Job的commit過程會嚴重影響任務的效能表現。 針對以上情況,可以通過將以下引數“mapreduce.fifileoutputcommitter.algorithm.version”配置為“2”,來提升MR Job commit階段的效能。 操作步驟 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > Yarn > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­20 引數說明 引數 描述 預設值 mapreduce.fifileoutputcommitter.algorithm.version 用於指定Job的最終輸出檔案提交的演算法版本,取值 為“1”或“2”。 說明: 版本2為建議的優化演算法版本。該演算法通過讓任務直接將 每個task的輸出結果提交到最終的結果輸出目錄,從而減 少大作業的輸出提交時間。 2 12.8 Solr 12.8.1 索引集分片劃分建議 操作場景 該操作指導系統管理員通過調整Solr的實例數和索引集(Collection)的分片數(Shard)來提高Solr的索引效能。 操作步驟 調整Solr的實例數 如有8個數據節點,建議按照如下方案進行規劃: 當索引資料存放在本地磁碟時: SolrServerAdmin角色佔用2個節點,其中每個節點上再部署SolrServerN(N=1~5),共計12個實例; 其餘6個節點,每個節點分別部署SolrServerN(N=1~5),共計30個實例。 共計42個實例 當索引資料存放在HDFS時:SolrServerAdmin角色佔用2個節點,其中每個節點上再部署SolrServerN(N=1~2),共計3個實例; 其餘6個節點,每個節點分別部署SolrServerN(N=1~3),共計18個實例。 共計24個實例 設定每個實例佔用系統最大記憶體為4GB。如果記憶體資源足夠,有利於索引效能提升。 調整記憶體佔用的步驟如下: 1. 登入FusionInsight Manager系統。 2. 單擊“服務管理 > Solr > 服務配置”。 3. 調節每個實例的“SOLR_GC_OPTS”引數,“-Xms2G -Xmx4G”部分表示佔用記憶體大小為2GB~4GB。將此引數值修改為“-Xms4G -Xmx4G”。 4. 單擊“儲存配置”,在彈出的對話方塊中選擇“重新啟動受影響的服務或實例。”重啟服務後生效。 調整Solr索引集的分片數 在建立HDFS資料和HBase資料的全文檢索索引集時(具體操作可參見“業務操作指南 > Solr > 業務常見操作”),建議按照以下方案進行規劃: 設定每個實例對應1個Shard,每個shard包含兩個replica(即副本因子為2),當要求副本數為2時,建立包含21個shard的collection,每個shard兩個副本分 佈於兩個不同節點實例,這樣42個實例分別包含一個replica。 根據以上方案,每個索引集包含42個Shard(以部署42個實例為例)。Shard數量越大,有助於查詢或索引的效能提升,但同時也會增加服務的互動操作,消 耗更多系統資源。 當索引資料存放在本地磁碟時,同樣參考以上規則進行規劃,也就是每個索引集包含24個shard(以部署24個實例為例)。 12.8.2 Solr公共讀寫調優建議 操作場景 該操作指導系統管理員對Solr可進行的公共讀寫效能調優。 前提步驟 已安裝Solr服務的客戶端。 操作步驟 當Solr索引資料存放在本地磁碟或者HDFS時,可以從以下幾個方面進行調優配置: Solr作為全文檢索服務器時,從Solr自身相關配置(JVM、Schema、Solrconfifig)方面進行調優。 1. Schema的配置優化 uniqueKey定義為long型別 說明: long型別的查詢效能優於string型別,如果需要定義為string型別,可以在業務層建立long到string的對映。 建議unqiueKey欄位配置required="true"。 建議uniqueKey欄位配置docValues="true"。 為了獲得更好的查詢效能,建議查詢時顯式地指定返回欄位為uniqueKey欄位。 需要排序、統計的欄位配置為docValues="true",可以有效節省記憶體使用 說明: 配置docValues="true"後,不需要再配置stored="true" 2. 優化索引方案 可以通過修改“solrconfifig.xml”檔案內容來實現以下調優效果。 調優配置項 修改結果 提高索引速度,加大索引執行緒數 <maxIndexingThreads>${solr.maxIndexingThreads:16} </maxIndexingThreads> 增大文件索引快取 <ramBufffferSizeMB>1024</ramBufffferSizeMB> 增大索引段合併因子 <mergeFactor>20</mergeFactor> 加大索引自動硬提交時間 <maxTime>${solr.autoCommit.maxTime:30000}</maxTime> 增大索引的自動軟提交時間 <maxTime>${solr.autoSoftCommit.maxTime:60000}</maxTime> 基於docValues獲取uniqueKey的值 <useDocValueGetField>true</useDocValueGetField> 對docId進行排序,優先順序讀取磁碟 <sortDocIdBeforeGetDoc>true</sortDocIdBeforeGetDoc> 快取docId,避免二次讀取磁碟 <useQuickFirstMatch>true</useQuickFirstMatch> 說明: useDocValueGetField的使用場景:返回欄位(flfl)為uniqueKey uniqueKey為Numberic型別(long/int/flfloat/double) uniqueKey配置docValues=true useQuickFirstMatch的使用場景: 索引入庫後不再更改(刪除/合併操作等) 3. 優化查詢方案 快取在Solr中充當了一個非常重要的角色,Solr中主要包括以下3種快取: Filter cache(過濾器快取),用於儲存過濾器(fq引數)和層面搜尋的結果; Document cache(文件快取),用於儲存lucene文件儲存的欄位; Query result(查詢快取),用於儲存查詢的結果。 說明: Solr中還包含lucene內部快取,該快取使用者無法調控。 通過調整這3種快取,可以對Solr的搜尋實例進行調優。在調整引數前,需要事先得到Solr實例中的以下資訊: 索引中文件的數量:單擊“服務管理 > Solr”,在“Solr WebUI”下單擊任意“SolrServerAdmin(XX)”進入Solr Admin頁面。在“Core Selector”中選擇 索引資料集Collection下的任意一個core,單擊“Query > Execute Query”,“numFound”項為該引數值; 過濾器的數量:使用者期望值(如:200); 一次查詢返回最大的文件數量:使用者期望值(如:100); 不同查詢和不同排序的個數:使用者期望值(如:500); 每次查詢欄位數:使用者期望值(如:3); 實例查詢的併發數:使用者期望值(如:10)。 可以通過修改“solrconfifig.xml”檔案內容(修改方法參考3)配置快取: 快取型別 修改方案 過濾器快取 <fifilterCache class="solr.FastLRUCache" size="200" initialSize="200" autowarmCount="50"/> “size”和“initialSize”值為快取document id的數量。 “autowarmCount”為“initialSiz”值的1/4。 根據實際場景合理設定,過大會佔用大量記憶體。 查詢結果快取 <queryResultCache class="solr.FastLRUCache" size="3000" initialSize="3000" autowarmCount="750"/> “size”和“initialSize”的值 = 不同查詢和不同排序的個數×每次查詢 欄位數×2。 “autowarmCount”值為“initialSize”值的1/4。 文件快取 <documentCache class="solr.FastLRUCache" size="1000" initialSize="1000"/> “size”和“initialSize”的值 = 一次查詢返回最大的文件數量×實例查詢的併發 數。 12.8.3 Solr over HBase調優建議 操作場景 該操作指導系統管理員在使用Solr over HBase相關功能時,對環境配置進行調優。 前提條件 已成功安裝HDFS、Solr、Yarn、HBase服務。 操作步驟 使用Solr over HBase相關功能時,可以從以下幾個方面進行調優配置: 作業系統優化 如果Solr索引在HDFS上,參考《Solr over HDFS調優建議》章節操作步驟中作業系統優化小節進行配置。 實時索引相關調優建議 1. 修改collection配置集的“solrconfifig.xml”配置檔案中,<autoSoftCommit>配置項,根據使用場景盡量設定大一些,設定越大索引效率越高。 2. 修改HBase服務配置,然後重啟HBase服務: “replication.source.nb.capacity”:5000(HBase叢集每次向HBaseIndexer傳送的entry最大的個數,推薦5000。可根據叢集規模做出適當調整,根據 HBaseIndexer部署情況適當增大)。 “replication.source.size.capacity”:4194304(HBase每次向HBaseIndexer傳送的entry包的最大值大小,不推薦過大)。 3. 修改HDFS服務配置,然後重啟HDFS服務:“hadoop.rpc.protection”:authentication(關閉資料傳輸加密,預設為privacy)。 “ipc.server.handler.queue.size”:隊列中允許的每個處理程式可處理的呼叫數,根據叢集環境適當調整。 “dfs.namenode.handler.count”:NameNode的服務器執行緒數,根據叢集環境適當調整。 “dfs.namenode.service.handler.count”:NameNode的服務器執行緒數,根據叢集環境適當調整。 “dfs.datanode.handler.count”:DataNode的服務執行緒數,根據叢集環境適當調整。 4. 修改HBaseIndexer服務配置,然後重啟HBaseIndexer實例: “hbaseindexer.indexer.threads”:50(預設值為20,HBaseIndexer實例進行索引操作時啟動的併發執行緒數量)。 調整 “GC_OPTS”至4G,記憶體空間充足,可以考慮適當增加。 批量索引、增量索引相關調優建議 1. SolrServer的GC引數配置(如果記憶體充足可以考慮增大):-Xmx8G -Xms8G 2. Yarn的配置,修改後重啟Yarn服務: “mapreduce.reduce.memory.mb”:8192(根據節點配置進行修改) “yarn.resourcemanager.scheduler.class”:org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler 3. 修改HBaseIndexer的配置檔案,進入“Solr客戶端的安裝目錄/hbase-indexer/”,執行一下命令修改配置檔案: vi /opt/client/Solr/hbase-indexer/conf/hbase-indexer-site.xml 設定“hbase-indexer-site.xml”部分以下幾個引數的引數值(如某些引數項不存在,可手動新增): 表12­21 “hbase­indexer­site.xml”配置檔案修改 引數(name) 引數值(value) solr.record.writer.batch.size 500 solr.record.writer.max.queues.size 300 solr.record.writer.num.threads 5 solr.record.writer.maxSegments 5 vi /opt/client/Solr/hbase-indexer/conf/yarn-site.xml 設定“yarn-site.xml”部分以下幾個引數的引數值(如某些引數項不存在, 可手動新增): 表12­22 “yarn­site.xml”配置檔案修改 引數(name) 引數值(value) mapreduce.map.speculative false mapreduce.reduce.speculative false 12.8.4 Solr over HDFS調優建議 操作場景 該操作指導系統管理員在使用Solr over HDFS相關功能時,對環境配置進行調優。 前提條件 已成功安裝HDFS、Solr、Yarn服務,且FusionInsight Manager頁面內Solr服務的服務配置引數“INDEX_STORED_ON_HDFS”為“TRUE”。 完成使用Solr over HDFS的準備工作。 操作步驟 使用Solr over HDFS時,可以選擇從以下幾個方面進行調優配置: 磁碟和網路規劃 1. 磁碟劃分的時候要注意,ZooKeeper單獨佔用一個磁碟或一個磁碟分割槽,否則當資料量過大,如果和HDFS共磁碟,頻繁的資料訪問和配置集訪問,會導致 ZooKeeper停止響應。 2. HDFS採用多塊單盤或者多個RAID0分別掛載多個數據目錄,MR任務處理資料量很大時磁碟IO會成為瓶頸。 3. 網路組網模式,當前環境為多網絡卡繫結模式bond0,網路組網模式可根據實際情況而定,不一定要求為多網絡卡繫結模式。 作業系統優化 每臺主機上都要執行,由於Solr在執行讀取HDFS時,會生成很多個臨時埠(連線本地DataNode。登入FusionInsight Manager,單擊“服務管理 > HDFS > 服務配置”,檢視“dfs.datanode.port”,其值為“DataNode Port”。執行netstat -anp | grep DataNode Port | wc -l命令,結果大於4096時。),出現 TIME_WAIT,最終導致任務失敗,為避免此種情況,需要如下設定 使用PuTTY客戶端,登入每個節點,進入Solr客戶端所在目錄,執行以下命令: vi /etc/sysctl.conf 新增以下內容: net.ipv4.tcp_syncookies = 1 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 net.ipv4.tcp_fifin_timeout = 30net.ipv4.tcp_timestamps = 1 net.ipv4.tcp_tw_recycle = 1 儲存退出後,執行 sysctl -p Solr實例部署,索引存放到HDFS上時,確保所有Solr實例同DataNode部署在相同的節點上。 對Solr提交的MapReduce任務的效能優化,請參考節點配置調優和JVM引數優化。 可參考如下建議,設定Yarn服務的配置引數,然後重啟Yarn服務: “mapreduce.task.timeout”:1800000(原始值為60000,處理大資料量時可以適當調大)。 “yarn.nodemanager.resource.cpu-vcores”:24(原始值為8,該值在處理大數量時可設定為當前節點總的CPU個數的1~2倍)。 “yarn.nodemanager.resource.memory-mb”:該引數最好對每個nodemanaegr分別配置,看一下主機介面每個節點的記憶體使用率,每個nodemanager該參 數配置為:空閒的記憶體數減去8G,或者為總記憶體的75%。 Solr使用“HDFSDirectoryFactory”索引方式讀寫HDFS上的索引檔案時,從Solr作為HDFS的Client方面的配置進行調優。 從Solr作為HDFS的Client方面的配置進行調優時,如果Solr和其他元件要部署在相同的節點上,建議每個節點上只部署一個Solr實例。 調整HDFS快取提高索引效能。Solr中HDFS相關快取,通常分配系統上可用記憶體量的10-20%。例如,在有128GB記憶體的主機上運行HDFS和Solr時,通常分 配12.8GB~25.6GB的記憶體作為HDFS相關快取。隨著索引大小的增加,需要調整此引數以保持最佳效能。該引數可通過修改“solrconfifig.xml”檔案進行配置。 請按照以下步驟,調整分配的快取大小: 1. 使用PuTTY工具,以root使用者登入Solr客戶端所在節點,進入Solr客戶端所在目錄,執行以下命令: source bigdata_env kinit solr 2. 執行以下命令獲取配置檔案集,並開啟“solrconfifig.xml”檔案: solrctl confset --get confWithSchema /home/solr/ vi /home/solr/conf/solrconfifig.xml 3. 修改引數“solr.hdfs.blockcache.slab.count”的值“<int name="solr.hdfs.blockcache.slab.count">${solr.hdfs.blockcache.slab.count:1}</int>”。 其中每個slab的大小為128MB ,18個slab,大約佔用2.3GB記憶體,這是每個分片Shard的配置,如果一個臺主機6個shard,那麼共計佔用13.8GB內 存。 由此可知,在此例中可將改引數值修改為“<int name="solr.hdfs.blockcache.slab.count">${solr.hdfs.blockcache.slab.count:18}</int>”。 4. 執行以下命令,上傳修改過的配置檔案集: solrctl confset --update confWithSchema /home/solr/ 12.9 Spark 12.9.1 Spark Core調優 12.9.1.1 資料序列化 操作場景 Spark支援兩種方式的序列化 : Java原生序列化JavaSerializer Kryo序列化KryoSerializer 序列化對於Spark應用的效能來說,具有很大的影響。在特定的資料格式的情況下,KryoSerializer的效能可以達到JavaSerializer的10倍以上,而對於一些Int之類的基 本型別資料,效能的提升就幾乎可以忽略。 KryoSerializer依賴Twitter的Chill庫來實現,相對於JavaSerializer,主要的問題在於不是所有的Java Serializable物件都能支援,相容性不好,所以需要手動註冊類。 序列化功能用在兩個地方:序列化任務和序列化資料。Spark任務序列化只支援JavaSerializer,資料序列化支援JavaSerializer和KryoSerializer。 操作步驟 Spark程式運行時,在shufflfflffle和RDD Cache等過程中,會有大量的資料需要序列化,預設使用JavaSerializer,通過配置讓KryoSerializer作為資料序列化器來提升序列 化效能。 在開發應用程式時,新增如下程式碼來使用KryoSerializer作為資料序列化器。 實現類註冊器並手動註冊類。 package com.etl.common; import com.esotericsoftware.kryo.Kryo; import org.apache.spark.serializer.KryoRegistrator; public class DemoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { //以下為示例類,請註冊自定義的類 kryo.register(AggrateKey.class); kryo.register(AggrateValue.class); } }您可以在Spark客戶端對spark.kryo.registrationRequired引數進行配置,設定是否需要Kryo註冊序列化。 當引數設定為true時,如果工程中存在未被序列化的類,則會丟擲異常。如果設定為false(預設值),Kryo會自動將未註冊的類名寫到對應的物件中。此操作 會對系統性能造成影響。設定為true時,使用者需手動註冊類,針對未序列化的類,系統不會自動寫入類名,而是丟擲異常,相對比false,其效能較好。 配置KryoSerializer作為資料序列化器和類註冊器。 val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "com.etl.common.DemoRegistrator") 12.9.1.2 配置記憶體 操作場景 Spark是記憶體計算框架,計算過程中記憶體不夠對Spark的執行效率影響很大。可以通過監控GC(Garbage Collection),評估記憶體中RDD的大小來判斷記憶體是否變成性 能瓶頸,並根據情況優化。 監控節點程序的GC情況(在客戶端的conf/spark-default.conf配置檔案中,在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置項中新增參 數:"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" ),如果頻繁出現Full GC,需要優化GC。把RDD做Cache操作,通過日誌檢視RDD在記憶體中的大小,如果資料太大,需要改變RDD的儲存級別來優化。 操作步驟 優化GC,調整老年代和新生代的大小和比例。在客戶端的conf/spark-default.conf配置檔案中,在spark.driver.extraJavaOptions和 spark.executor.extraJavaOptions配置項中新增引數:-XX:NewRatio。如," -XX:NewRatio=2",則新生代佔整個堆空間的1/3,老年代佔2/3。 開發Spark應用程式時,優化RDD的資料結構。 使用原始型別陣列替代集合類,如可使用fastutil庫。 避免巢狀結構。 Key盡量不要使用String。 開發Spark應用程式時,建議序列化RDD。 RDD做cache時預設是不序列化資料的,可以通過設定儲存級別來序列化RDD減小記憶體。例如: testRDD.persist(StorageLevel.MEMORY_ONLY_SER) 12.9.1.3 設定並行度 操作場景 並行度控制任務的數量,影響shufflfflffle操作後資料被切分成的塊數。調整並行度讓任務的數量和每個任務處理的資料與機器的處理能力達到最優。 檢視CPU使用情況和記憶體佔用情況,當任務和資料不是平均分佈在各節點,而是集中在個別節點時,可以增大並行度使任務和資料更均勻的分佈在各個節點。增加任務 的並行度,充分利用叢集機器的計算能力,一般並行度設定為叢集CPU總和的2-3倍。 操作步驟 並行度可以通過如下三種方式來設定,使用者可以根據實際的記憶體、CPU、資料以及應用程式邏輯的情況調整並行度引數。 在會產生shufflfflffle的操作函式內設定並行度引數,優先順序最高。 testRDD.groupByKey(24) 在程式碼中配置“spark.default.parallelism”設定並行度,優先順序次之。 val conf = new SparkConf() conf.set("spark.default.parallelism", 24) 在“$SPARK_HOME/conf/spark-defaults.conf”檔案中配置“spark.default.parallelism”的值,優先順序最低。 spark.default.parallelism 24 12.9.1.4 使用廣播變量 操作場景 Broadcast(廣播)可以把資料集合分發到每一個節點上,Spark任務在執行過程中要使用這個資料集合時,就會在本地查詢Broadcast過來的資料集合。如果不使用 Broadcast,每次任務需要資料集合時,都會把資料序列化到任務里面,不但耗時,還使任務變得很大。 1. 每個任務分片在執行中都需要同一份資料集合時,就可以把公共資料集Broadcast到每個節點,讓每個節點在本地都儲存一份。 2. 大表和小表做join操作時可以把小表Broadcast到各個節點,從而就可以把join操作轉變成普通的操作,減少了shufflfflffle操作。 操作步驟 在開發應用程式時,新增如下程式碼,將“testArr”資料廣播到各個節點。 def main(args: Array[String]) { ... val testArr: Array[Long] = new Array[Long](200) val testBroadcast: Broadcast[Array[Long]] = sc.broadcast(testArr) val resultRdd: RDD[Long] = inpputRdd.map(input => handleData(testBroadcast, input)) ... } def handleData(broadcast: Broadcast[Array[Long]], input: String) { val value = broadcast.value ... }12.9.1.5 使用External Shufflfflffle Service提升效能 操作場景 Spark系統在運行含shufflfflffle過程的應用時,Executor程序除了運行task,還要負責寫shufflfflffle資料以及給其他Executor提供shufflfflffle資料。當Executor程序任務過重,導致 觸發GC(Garbage Collection)而不能為其他Executor提供shufflfflffle資料時,會影響任務運行。 External shufflfflffle Service是長期存在於NodeManager程序中的一個輔助服務。通過該服務來抓取shufflfflffle資料,減少了Executor的壓力,在Executor GC的時候也不會 影響其他Executor的任務運行。 操作步驟 1. 登入FusionInsight Manager系統。 2. 單擊“服務管理 > Spark > 服務配置”。在“引數類別”中選擇“全部配置”。 3. 選擇“SparkResource > 預設”,修改以下引數: 表12­23 引數列表 引數 預設值 修改結果 spark.shufflfflffle.service.enabled false true 4. 重啟Spark服務,配置生效。 說明: 如果需要在Spark客戶端用External Shufflfflffle Service功能,需要重新下載並安裝Spark客戶端,具體操作請參見《安裝客戶端》章節。 12.9.1.6 Yarn模式下動態資源排程 操作場景 對於Spark應用來說,資源是影響Spark應用執行效率的一個重要因素。當一個長期運行的服務(比如JDBCServer),若分配給它多個Executor,可是卻沒有任何任務 分配給它,而此時有其他的應用卻資源緊張,這就造成了很大的資源浪費和資源不合理的排程。 動態資源排程就是為了解決這種場景,根據當前應用任務的負載情況,實時的增減Executor個數,從而實現動態分配資源,使整個Spark系統更加健康。 操作步驟 1. 需要先配置External shufflfflffle service。 2. 登入FusionInsight Manager,將“spark.dynamicAllocation.enabled”引數的值設定為“true”,表示開啟動態資源排程功能。預設情況下關閉此功能。 3. 下面是一些可選配置,如表12-24所示。 表12­24 動態資源排程引數 配置項 說明 預設值 spark.dynamicAllocation.minExecutors 最小Executor個數。 0 spark.dynamicAllocation.initialExecutors 初始Executor個數。 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 最大Executor個數。 2048 spark.dynamicAllocation.schedulerBacklogTimeout 排程第一次超時時間。 1(s) spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 排程第二次及之後超時時間。 spark.dynamicAllocation.schedulerBacklogTimeout spark.dynamicAllocation.executorIdleTimeout 普通Executor空閒超時時間。 60(s) spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空 閒超時時間。 Integer.MAX_VALUE 說明: 使用動態資源排程功能,必須配置External Shufflfflffle Service。 12.9.1.7 配置程序引數 操作場景 Spark on YARN模式下,有Driver、ApplicationMaster、Executor三種程序。在任務排程和運行的過程中,Driver和Executor承擔了很大的責任,而 ApplicationMaster主要負責container的啟停。 因而Driver和Executor的引數配置對spark應用的執行有著很大的影響意義。使用者可通過如下操作對Spark叢集效能做優化。 操作步驟 1. 配置Driver記憶體。 Driver負責任務的排程,和Executor、AM之間的訊息通訊。當任務數變多,任務平行度增大時,Driver記憶體都需要相應增大。 您可以根據實際任務數量的多少,為Driver設定一個合適的記憶體。 將“spark-defaults.conf”中的“spark.driver.memory”配置項或者“spark-env.sh”中的“SPARK_DRIVER_MEMORY”配置項設定為合適大小。在使用spark-submit命令時,新增“--driver-memory MEM”引數設定記憶體。 2. 配置Executor個數。 每個Executor每個核同時能跑一個task,所以增加了Executor的個數相當於增大了任務的併發度。在資源充足的情況下,可以相應增加Executor的個數,以 提高運行效率。 將“spark-defaults.conf”中的“spark.executor.instance”配置項或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTANCES”配置項設定為合適 大小。您還可以設定動態資源排程功能進行優化,詳情請參見“Yarn模式下動態資源排程”章節。 在使用spark-submit命令時,新增“--num-executors NUM”引數設定Executor個數。 3. 配置Executor核數。 每個Executor多個核同時能跑多個task,相當於增大了任務的併發度。但是由於所有核共用Executor的記憶體,所以要在記憶體和核數之間做好平衡。 將“spark-defaults.conf”中的“spark.executor.cores”配置項或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置項設定為合適大小。 在使用spark-submit命令時,新增“--executor-cores NUM”引數設定核數。 4. 配置Executor記憶體。 Executor的記憶體主要用於任務執行、通訊等。當一個任務很大的時候,可能需要較多資源,因而記憶體也可以做相應的增加;當一個任務較小運行較快時,就 可以增大併發度減少記憶體。 將“spark-defaults.conf”中的“spark.executor.memory”配置項或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置項設定為合適大 小。 在使用spark-submit命令時,新增“--executor-memory MEM”引數設定記憶體。 示例 在執行spark wordcount計算中。1.6T資料,250個executor。 在預設引數下執行失敗,出現Futures timed out和OOM錯誤。 因為資料量大,task數多,而wordcount每個task都比較小,完成速度快。當task數多時driver端相應的一些物件就變大了,而且每個task完成時executor和 driver都要通訊,這就會導致由於記憶體不足,程序之間通訊斷連等問題。 當把Driver的記憶體設定到4g時,應用成功跑完。 使用JDBCServer執行TPC-DS測試套,預設引數配置下也報了很多錯誤:Executor Lost等。而當配置Driver記憶體為30g,executor核數為2,executor個數為 125,executor記憶體為6g時,所有任務才執行成功。 12.9.1.8 設計DAG 操作場景 合理的設計程式結構,可以優化執行效率。在程式編寫過程中要盡量減少shufflfflffle操作,合併窄依賴操作。 操作步驟 以“同行車判斷”例子講解DAG設計的思路。 資料格式:通過收費站時間、車牌號、收費站編號...... 邏輯:以下兩種情況下判定這兩輛車是同行車: 如果兩輛車都通過相同序列的收費站, 通過同一收費站之間的時間差小於一個特定的值。 該例子有兩種實現模式,其中實現1的邏輯如圖12-2所示,實現2的邏輯如圖12-3所示。 圖12­2 實現1邏輯 實現1的邏輯說明 : 1. 根據車牌號聚合該車通過的所有收費站並排序,處理後資料如下: 車牌號1,[(通過時間,收費站3),(通過時間,收費站2),(通過時間,收費站4),(通過時間,收費站5)] 2. 標識該收費站是這輛車通過的第幾個收費站。(收費站3,(車牌號1,通過時間,通過的第1個收費站)) (收費站2,(車牌號1,通過時間,通過的第2個收費站)) (收費站4,(車牌號1,通過時間,通過的第3個收費站)) (收費站5,(車牌號1,通過時間,通過的第4個收費站)) 3. 根據收費站聚合資料。 收費站1,[(車牌號1,通過時間,通過的第1個收費站),(車牌號2,通過時間,通過的第5個收費站),(車牌號3,通過時間,通過的第2個收費站)] 4. 判斷兩輛車通過該收費站的時間差是否滿足同行車的要求,如果滿足則取出這兩輛車。 (車牌號1,車牌號2),(通過的第1個收費站,通過的第5個收費站) (車牌號1,車牌號3),(通過的第1個收費站,通過的第2個收費站) 5. 根據通過相同收費站的兩輛車的車牌號聚合資料,如下: (車牌號1,車牌號2),[(通過的第1個收費站,通過的第5個收費站),(通過的第2個收費站,通過的第6個收費站),(通過的第1個收費站,通過的第7 個收費站),(通過的第3個收費站,通過的第8個收費站)] 6. 如果車牌號1和車牌號2通過相同收費站是順序排列的(比如收費站3、4、5是車牌1通過的第1、2、3個收費站,是車牌2通過的第6、7、8個收費站)且數量 大於同行車要求的數量則這兩輛車是同行車。 實現1邏輯的缺點 : 邏輯複雜 實現過程中shufflfflffle操作過多,對效能影響較大。 圖12­3 實現2邏輯 實現2的邏輯說明 : 1. 根據車牌號聚合該車通過的所有收費站並排序,處理後資料如下: 車牌號1,[(通過時間,收費站3),(通過時間,收費站2),(通過時間,收費站4),(通過時間,收費站5)] 2. 根據同行車要通過的收費站數量(例子里為3)分段該車通過的收費站序列,如上面的資料被分解成 : 收費站3->收費站2->收費站4, (車牌號1,[收費站3時間,收費站2時間,收費站4時間]) 收費站2->收費站4->收費站5, (車牌號1,[收費站2時間,收費站4時間,收費站5時間]) 3. 把通過相同收費站序列的車輛聚合,如下: 收費站3->收費站2->收費站4,[(車牌號1,[收費站3時間,收費站2時間,收費站4時間]),(車牌號2,[收費站3時間,收費站2時間,收費站4時間]), (車牌號3,[收費站3時間,收費站2時間,收費站4時間])] 4. 判斷通過相同序列收費站的車輛通過相同收費站的時間差是不是滿足同行車的要求,如果滿足則說明是同行車。 實現2的優點如下: 簡化了實現邏輯。 減少了一個groupByKey,也就減少了一次shufflfflffle操作,提升了效能。 12.9.1.9 經驗總結 使用mapPartitions,按每個分割槽計算結果 如果每條記錄的開銷太大,例: rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close} 則可以使用MapPartitions,按每個分割槽計算結果,如 rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close) 使用mapPartitions可以更靈活地操作資料,例如對一個很大的資料求TopN,當N不是很大時,可以先使用mapPartitions對每個partition求TopN,collect結果到本地 之後再做排序取TopN。這樣相比直接對全量資料做排序取TopN效率要高很多。 使用coalesce調整分片的數量 coalesce可以調整分片的數量。coalesce函式有兩個引數:coalesce(numPartitions: Int, shufflfflffle: Boolean = false) 當shufflfflffle為true的時候,函式作用與repartition(numPartitions: Int)相同,會將資料通過Shufflfflffle的方式重新分割槽;當shufflfflffle為false的時候,則只是簡單的將父RDD的多 個partition合併到同一個task進行計算,shufflfflffle為false時,如果numPartitions大於父RDD的切片數,那麼分割槽不會重新調整。 遇到下列場景,可選擇使用coalesce運算元: 當之前的操作有很多fifilter時,使用coalesce減少空運行的任務數量。此時使用coalesce(numPartitions, false),numPartitions小於父RDD切片數。 當輸入切片個數太大,導致程式無法正常運行時使用。 當任務數過大時候Shufflfflffle壓力太大導致程式掛住不動,或者出現linux資源受限的問題。此時需要對資料重新進行分割槽,使用coalesce(numPartitions, true)。 localDir配置 Spark的Shufflfflffle過程需要寫本地磁碟,Shufflfflffle是Spark效能的瓶頸,I/O是Shufflfflffle的瓶頸。配置多個磁碟則可以並行的把資料寫入磁碟。如果節點中掛載多個磁碟,則 在每個磁碟配置一個Spark的localDir,這將有效分散Shufflfflffle檔案的存放,提高磁碟I/O的效率。如果只有一個磁碟,配置了多個目錄,效能提升效果不明顯。 Collect小資料 大資料量不適用collect操作。 collect操作會將Executor的資料傳送到Driver端,因此使用collect前需要確保Driver端記憶體足夠,以免Driver程序發生OutOfMemory異常。當不確定資料量大小時,可 使用saveAsTextFile等操作把資料寫入HDFS中。只有在能夠大致確定資料大小且driver記憶體充足的時候,才能使用collect。 使用reduceByKey reduceByKey會在Map端做本地聚合,使得Shufflfflffle過程更加平緩,而groupByKey等Shufflfflffle操作不會在Map端做聚合。因此能使用reduceByKey的地方盡量使用該算 子,避免出現groupByKey().map(x=>(x._1,x._2.size))這類實現方式。 廣播map代替陣列 當每條記錄需要查表,如果是Driver端用廣播方式傳遞的資料,資料結構優先採用set/map而不是Iterator,因為Set/Map的查詢速率接近O(1),而Iterator是O(n)。 資料傾斜 當資料發生傾斜(某一部分資料量特別大),雖然沒有GC(Gabage Collection,垃圾回收),但是task執行時間嚴重不一致。 需要重新設計key,以更小粒度的key使得task大小合理化。 修改並行度。 優化資料結構 把資料按列存放,讀取資料時就可以只掃描需要的列。 使用Hash Shufflfflffle時,通過設定spark.shufflfflffle.consolidateFiles為true,來合併shufflfflffle中間檔案,減少shufflfflffle檔案的數量,減少檔案IO操作以提升效能。最終 檔案數為reduce tasks數目。 12.9.1.10 優化返回查詢結果返回大量資料的場景 操作場景 使用命令提交任務時,如果任務中包含了查詢結果返回大量資料的操作,由於此類操作會在driver端佔用大量記憶體進行資料存放,則存在導致記憶體溢位的風險,因此需 要進行優化來支援該場景。 操作步驟 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­25 引數說明 引數 描述 預設值 spark.sql.bigdata.thriftServer.useHdfsCollect driver端讀取資料時的方式,當配置為false時,讀取的資料會存放在記憶體中,適合 資料量不大(資料量小於Driver端設定的記憶體大小)的情況。 如果返回的資料量大(資料量大於Driver端設定的記憶體大小),需要把此值配置成 true,由於先儲存成HDFS檔案,再分批讀取,效能比配置成false慢。 false 12.9.2 SQL和DataFrame調優 12.9.2.1 Spark SQL join優化 操作場景 Spark SQL中,當對兩個表進行join操作時,利用Broadcast特性(見“使用廣播變量”章節),將小表BroadCast到各個節點上,從而轉變成非shufflfflffle操作,提高任務 執行效能。 說明: 這里join操作,只指inner join。 操作步驟 在Spark SQL中進行Join操作時,可以按照以下步驟進行優化。為了方便說明,設表A和表B,且A、B表都有個名為name的列。對A、B表進行join操作。 1. 估計表的大小。根據每次載入資料的大小,來估計表大小。 也可以在Hive的資料庫儲存路徑下直接查看錶的大小。首先在Spark的配置檔案hive-site.xml中,檢視Hive的資料庫路徑的配置,預設 為“/user/hive/warehouse”。Spark服務多實例預設資料庫路徑為“/user/hive/warehouse”,例如“/user/hive1/warehouse”。 <property> <name>hive.metastore.warehouse.dir</name> <value>${test.warehouse.dir}</value> <description></description> </property> 然後通過hadoop命令檢視對應表的大小。如查看錶A的大小命令為: hadoop fs -du -s -h ${test.warehouse.dir}/a 說明: 進行廣播操作,需要至少有一個表不是空表。 2. 配置自動廣播的閾值。 Spark中,判斷表是否廣播的閾值為67108864(即64M)。如果兩個表的大小至少有一個小於64M時,可以跳過該步驟。 自動廣播閾值的配置引數介紹,見表12-26。 表12­26 引數介紹 引數 預設值 描述 spark.sql.autoBroadcastJoinThreshold 67108864 當進行join操作時,配置廣播的最大值;當表的位元組數小於該值時便進行廣播。 當配置為-1時,將不進行廣播。 參見https://spark.apache.org/docs/latest/sql-programming-guide.html 配置自動廣播閾值的方法: 在Spark的配置檔案“spark-defaults.conf”中,設定“spark.sql.autoBroadcastJoinThreshold”的值。其中,<size>根據場景而定,但要求該值至 少比其中一個表大。 spark.sql.autoBroadcastJoinThreshold = <size> 利用Hive CLI命令,設定閾值。在運行Join操作時,提前運行下面語句 SET spark.sql.autoBroadcastJoinThreshold=<size> 其中,<size>根據場景而定,但要求該值至少比其中一個表大。 3. (可選)如下兩種場景,需要執行Analyze命令(ANALYZETABLEtableNameCOMPUTESTATISTICSnoscan;)更新表元資料後進行廣播。 需要廣播的表是分割槽表,新建表且檔案型別為非Parquet檔案型別。 需要廣播的表是分割槽表,更新表資料後。 4. 進行join操作。 這時join的兩個table,至少有個表是小於閾值的。 如果A表和B表都小於閾值,且A表的位元組數小於B表時,則運行B join A,如 SELECT A.name FROM B JOIN A ON A.name = B.name; 否則運行A join B。 SELECT A.name FROM A JOIN B ON A.name = B.name; 5. 使用Executor廣播減少Driver記憶體壓力。 預設的BroadCastJoin會將小表的內容,全部收集到Driver中,因此需要適當的調大Driver的記憶體。記憶體增加的計算公式 為:“spark.sql.autoBroadcastJoinThreshold * the number of broadcast table * 2”。當廣播任務比較頻繁的時候,Driver有可能因為OOM而異常退出。 此時,可以開啟Executor廣播,在客戶端“spark-defaults.conf”檔案中配置Executor廣播引數“spark.sql.bigdata.useExecutorBroadcast”為“true”,減少 Driver記憶體壓力。 表12­27 引數介紹 引數 描述 預設值 spark.sql.bigdata.useExecutorBroadcast 設定為true時,使用Executor廣播,將表資料快取在Executor中,而不是放在Driver 之中,減少Spark Driver記憶體的壓力。 true 參考資訊 小表執行超時,導致任務結束。 預設情況下,BroadCastJoin只允許小表計算5分鐘,超過5分鐘該任務會出現超時異常,而這個時候小表的broadcast任務依然在執行,造成資源浪費。 這種情況下,有兩種方式處理: 調整“spark.sql.broadcastTimeout”的數值,加大超時的時間限制。 降低“spark.sql.autoBroadcastJoinThreshold”的數值,不使用BroadCastJoin的優化。 12.9.2.2 優化資料傾斜場景下的Spark SQL效能 配置場景 在Spark SQL多表Join的場景下,會存在關聯鍵嚴重傾斜的情況,導致Hash分桶後,部分桶中的資料遠高於其它分桶。最終導致部分Task過重,跑得很慢;其它Task 過輕,跑得很快。一方面,資料量大Task運行慢,使得計算效能低;另一方面,資料量少的Task在運行完成後,導致很多CPU空閒,造成CPU資源浪費。通過如下配置項可將部分資料採用Broadcast方式分發,以便均衡Task,提高CPU資源的利用率,從而提高效能。 說明: 未產生傾斜的資料,將採用原有方式進行分桶並運行。 使用約束: 1. 兩表中傾斜的關聯鍵不同。(例如:A join B on A.name = B.name,如果A中name = "zhangsan"傾斜,那麼B中name = "zhangsan"不能傾斜,其他的如 name = "lisi"可以傾斜。) 2. 只支援兩表間的Join。 3. TS(TableScan)到Join之間的操作只支援TS(TableScan)、FIL(Filter)、SEL(Select)。 配置描述 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­28 引數說明 引數 描述 預設值 spark.sql.planner.skewJoin 設定是否開啟資料傾斜優化。“true”表示開啟。開啟後,會基於“spark.sql.planner.skewJoin.threshold”參 數識別出傾斜關鍵,系統會將這部分資料採用Broadcast方式,可以避免資料傾斜,提升CPU利用率,從而 提升效能。 false spark.sql.planner.skewJoin.threshold 用於判斷是否存在資料傾斜的閾值。當存在關聯鍵的個數大於該閾值,則存在資料傾斜,該關聯鍵為傾斜 關聯鍵。 100000 12.9.2.3 優化小檔案場景下的Spark SQL效能 配置場景 Spark SQL的表中,經常會存在很多小檔案(大小遠小於HDFS塊大小),每個小檔案預設對應Spark中的一個Partition,也就是一個Task。在很多小檔案場景下, Spark會起很多Task。當SQL邏輯中存在Shufflfflffle操作時,會大大增加hash分桶數,嚴重影響效能。 在小檔案場景下,您可以通過如下配置手動指定每個Task的資料量(Split Size),確保不會產生過多的Task,提高效能。 說明: 當SQL邏輯中不包含Shufflfflffle操作時,設定此配置項,不會有明顯的效能提升。 配置描述 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­29 引數說明 引數 描述 預設值 spark.sql.small.fifile.combine 用於設定是否開啟小檔案優化。“true”表示開啟。開啟後,可以避免過多的小Task。 false spark.sql.small.fifile.split.size 合併小檔案後,用於指定單個Task期望的資料量。 單位:Byte 256000000 12.9.2.4 INSERT...SELECT操作調優 操作場景 在以下幾種情況下,執行INSERT...SELECT操作可以進行一定的調優操作。 查詢的資料是大量的小檔案。 查詢的資料是較多的大檔案。 在beeline/JDBCServer模式下使用非spark使用者操作。 操作步驟 可對INSERT...SELECT操作做如下的調優操作。 如果建的是Hive表,將儲存型別設為Parquet,從而減少執行INSERT...SELECT語句的時間。 建議使用spark-sql或者在beeline/JDBCServer模式下使用spark使用者來執行INSERT...SELECT操作,避免執行更改檔案owner的操作,從而減少執行 INSERT...SELECT語句的時間。 說明: 在beeline/JDBCServer模式下,executor的使用者跟driver是一致的,driver是JDBCServer服務的一部分,是由spark使用者啟動的,因此其使用者也是 spark使用者,且當前無法實現在運行時將beeline端的使用者透傳到executor,因此使用非spark使用者時需要對檔案進行更改owner為beeline端的使用者,即 實際使用者。 如果查詢的資料是大量的小檔案將會產生大量map操作,從而導致輸出存在大量的小檔案,在執行重新命名檔案操作時將會耗費較多時間,此時可以通過設定 spark.sql.small.fifile.combine = true來開啟小檔案合併功能來減少輸出檔案數,減少執行重新命名檔案操作的時間,從而減少執行INSERT...SELECT語句的時 間。如果查詢的資料是較多的大檔案,那麼執行該操作的輸出檔案也會很多,當這個量級達到數十萬時,此時可以通過設定spark.sql.small.fifile.combine = true來 開啟小檔案合併功能,同時設定spark.sql.small.fifile.split.size為一個較合理的值,控制輸出檔案大小,減小輸出檔案個數(原則是確保reduce任務能夠充分利 用叢集資源,否則會增加寫檔案的時間),減少執行重新命名檔案的時間,從而減少執行INSERT...SELECT語句的時間。 說明: 上述優化操作並不能解決全部的效能問題,對於以下兩種場景仍然需要較多時間: 對於動態分割槽表,如果其分割槽數非常多,那麼也需要執行較長的時間。 如果查詢的資料為大量的大檔案,那麼即使開啟小檔案合併功能,其輸出檔案也依舊很多,那麼也需要較長的時間。 12.9.2.5 多併發JDBC客戶端連線JDBCServer 操作場景 JDBCServer支援多使用者多併發接入,但當併發任務數量較高的時候,預設的JDBCServer配置將無法支援,因此需要進行優化來支援該場景。 操作步驟 1. 設定JDBCServer的公平排程策略。 Spark預設使用FIFO(First In First Out)的排程策略,但對於多併發的場景,使用FIFO策略容易導致短任務執行失敗。因此在多併發的場景下,需要使用公 平排程策略,防止任務執行失敗。 a. 在Spark中設定公平排程,具體請參考http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application b. 在JDBC客戶端中設定公平排程。 i. 在BeeLine命令行客戶端或者JDBC自定義程式碼中,執行以下語句, 其中PoolName是公平排程的某一個排程池。 SET spark.sql.thriftserver.scheduler.pool=PoolName; ii. 執行相應的SQL命令,Spark任務將會在上面的排程池中運行。 2. 設定BroadCastHashJoin的執行緒池個數。 BroadCastHashJoin使用多執行緒方式廣播表,在多併發場景下,會有多個表同時在多執行緒中,一旦廣播表的個數大於執行緒池個數,任務會出錯,因此需要在 JDBCServer的“spark-defaults.conf”配置檔案中或在命令行中執行setspark.sql.broadcastHashJoin.maxThreadNum=value,調整執行緒池個數。 表12­30 引數描述 引數 描述 預設值 spark.sql.broadcastHashJoin.maxThreadNum 用於BroadcastHashJoin的最大的執行緒池個數,同一時間被廣播 的表的個數應該小於該引數值。 128 3. 設定BroadCastHashJoin的超時時間。 BroadCastHashJoin有超時引數,一旦超過預設的時間,該查詢任務直接失敗,在多併發場景下,由於計算任務搶佔資源,可能會導致BroadCastHashJoin 的Spark任務無法執行,導致超時出現。因此需要在JDBCServer的“spark-defaults.conf”配置檔案中調整超時時間。 表12­31 引數描述 引數 描述 預設值 spark.sql.broadcastTimeout BroadcastHashJoin中廣播表的超時時間,當任務併發數較高的時候,可 以調高該引數值,或者直接配置為負數,負數為無窮大的超時時間。 300(數值型別,實際為五分鐘) 4. 設定串行BroadcastHashJoin。 當併發任務非常重(例如全部完成時間超過2個小時),需要將BroadcastHashJoin設定為串行化,這樣就能去除超時時間對併發任務的影響。但是串行化相 對於並行化,會降低叢集資源的使用率,因此在輕量級任務併發時,不要開啟該配置項。 表12­32 引數描述 引數 描述 預設值 spark.sql.bigdata.useSerialBroadcastHashJoin 是否使用串行方式執行BroadcastHashJoin。串行化 BroadcastHashJoin會降低叢集資源使用率,但對於高併發的重任 務,可以解決超時的困擾。 false 12.9.2.6 Parquet元資料buildScan優化 操作場景 在分割槽表的場景下,會對每個分割槽串行執行buildScan操作來構造RDD,在構造RDD時會隨著分割槽數的增加而增加執行時間。因此,提供並行執行buildScan操作來構造 RDD,從而提升執行效率。 該優化主要是Driver利用多執行緒並行執行buildScan操作來提升效能,因此不適合多session場景,因為在多session場景下有可能造成Driver運行過多的執行緒,從而造成 未知錯誤。 操作步驟 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。表12­33 引數描述 引數 描述 預設值 spark.sql.sources.parallelBuildScan.threshold 並行執行buildScan操作的分割槽數閾值。 -1 spark.sql.sources.parallelBuildScan.threadNum 並行執行buildScan操作的執行緒數。 2 12.9.2.7 ParquetRelation InputSplits優化 操作場景 當前讀取ParquetRelation型別的資料時每次都會執行getSplits操作,如果要讀取的檔案較多,則耗時較長。因此在第一次構造Relation時讀取全部InputSplits資訊並緩 存,後續只要快取沒被清除,則每次只需從快取中讀取所需的InputSplits資訊,從而提升非第一次查詢的效能。 操作步驟 在客戶端的“spark-defaults.conf”配置檔案調整如下引數。 表12­34 引數描述 引數 描述 預設值 spark.sql.source.inputSplit.useCache 是否開啟快取ParquetRelation的InputSplits資訊功能。 false 說明: 只有在資料檔案數在區間(1000, 100000]之內,且spark.sql.source.inputSplit.useCache=true時才會快取ParquetRelation的InputSplits資訊。 快取ParquetRelation的InputSplits資訊功能只會在第一次構造Relation時(例如第一次執行查詢操作時)生效。如果在第一次構造Relation之後才開啟 快取ParquetRelation的InputSplits資訊功能,那麼可通過執行REFRESH操作來清除對應的Relation快取。 一旦執行過資料匯入,必須執行REFRESH操作來重新整理快取,否則有可能造成執行查詢操作時上報檔案不存在異常。 12.9.2.8 Limit優化 操作場景 在RDD的Partition數過多時(大於或等於1000,例如spark.sql.shufflfflffle.partitions=2000時),建議使用Limit優化,減少掃描的檔案數。 在物理計劃中,Limit分為非末端Limit和末端Limit。非末端Limit優化可以通過“spark.sql.optimize.limit”配置項進行開啟或關閉。末端Limit優化沒有控制開關,預設都 進行優化。使用者可以通過在Spark SQL客戶端中執行explain<SQLstatement>命令檢視物理計劃。 舉例說明: 如下所示,使用explain命令查詢物理計劃,在查詢結果的中間位置顯示的就是非末端Limit,即下面藍色字型標識的部分。 spark-sql> explain select count(*) from (select key, sum(value) from src group by key limit 10) t; == Physical Plan == TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#24L]) TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#27L]) Limit 10 ConvertToSafe TungstenAggregate(key=[key#21], functions=[], output=[]) TungstenExchange hashpartitioning(key#21) TungstenAggregate(key=[key#21], functions=[], output=[key#21]) HiveTableScan [key#21], (MetastoreRelation default, src, None), Statistics(5812) Time taken: 0.119 seconds, Fetched 9 row(s) 如下所示,使用explain命令查詢物理計劃,在查詢結果的最前面顯示的就是末端Limit,即下面藍色字型標識的部分。 spark-sql> explain select key, sum(value) from src group by key limit 10; == Physical Plan == Limit 10 ConvertToSafe TungstenAggregate(key=[key#33], functions=[(sum(cast(value#34 as double)),mode=Final,isDistinct=false)], output=[key#33,_c1#35]) TungstenExchange hashpartitioning(key#33) TungstenAggregate(key=[key#33], functions=[(sum(cast(value#34 as double)),mode=Partial,isDistinct=false)], output=[key#33,currentSum#38]) HiveTableScan [key#33,value#34], (MetastoreRelation default, src, None), Statistics(5812) 操作步驟 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­35 引數描述 引數 描述 預設值 spark.sql.optimize.limit 非末端Limit優化開關。開啟本開關,且Limit個數小於等於1000時,優化生效,使非末端 Limit也使用末端Limit的分批讀取的演算法。建議開啟。 false spark.sql.limit.numPartsToTry.fifirst 在Limit場景下第一次讀取的Partition數。在RDD的Partition數過多時,建議開啟。建議 配置區間為[1, min(limit value, 10)]。 -1 spark.sql.limit.numPartsToTry.other 在Limit場景下後續讀取的Partition數。在RDD的Partition數過多時,建議開啟。建議配 置區間為[50, 100]。生效條件是spark.sql.limit.numPartsToTry.fifirst>0,若不配置則默 認使用spark.sql.limit.numPartsToTry.fifirst的引數值。 -112.9.2.9 LimitScan優化 操作場景 使用者場景中,存在快速展示資料的場景,典型的SQL語句如下所示: select col1, col2 from table limit 100; 通過執行包含Limit子句的Spark SQL語句,來查詢表的部分資料,並快速展示,但是在查詢parquet大表時,由於獲取大表分割槽資訊耗時較長,導致無法達到實時查詢 的目的。此時使用者可以開啟LimitScan優化,提高查詢效能,快速展示查詢結果。 操作步驟 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­36 引數描述 引數 描述 預設值 spark.sql.limitScan.enabled 是否開啟LimitScan優化。 true spark.sql.limitScan.num Limit個數限制,超過這個數值,則不做LimitScan優化。 5000 使用約束 只適用於Hive命令建的表,不能用於DataSource表。 Select的表字段型別只適用於原生資料型別。如果有複雜資料型別(Array,Struct,Map),則不做LimitScan優化。 Select的欄位必須為表中真實欄位,或者該欄位的別名,例如“select col1, col2 as alias”。 Select欄位中如果有帶算術運算、函式的欄位,則不支援LimitScan優化,例如“select col1*100+100, avg(col2) from table1 limit 100”。 不適用於包含WHERE、HAVING、GROUP BY、CLUSTER BY、DISTRIBUTE BY、SORT BY、ORDER BY子句的Select語句。 若Spark服務端的“hive-site.xml”檔案中“hive.server2.enable.doAs”配置成“true”,則無法使用LimitScan功能。 12.9.2.10 預先Broadcast小表優化 操作場景 開啟預先Broadcast小表優化後,同一SQL語句或不同SQL語句內,存在相同的小表或對相同小表做子查詢時,只需要將小表廣播一次,後續就可以複用快取在記憶體中 的表資料,避免重複廣播,從而提升SQL的效能。 支援預廣播的小表是指小於自動廣播閾值的表,使用者可以通過Spark的配置檔案“spark-defaults.conf”中的“spark.sql.autoBroadcastJoinThreshold”引數指定自動廣 播閾值。 操作步驟 1. 通過如下配置項開啟預廣播小表的功能。 在客戶端的“spark-defaults.conf”配置檔案中調整如下引數。 表12­37 引數描述 引數 描述 預設值 spark.sql.saveBroadcastTables.enabled 是否開啟預先廣播小表的優化功能。 true 在服務端的“spark-defaults.conf”配置檔案中調整如下引數。重啟服務端後才生效。 表12­38 引數描述 引數 描述 預設值 spark.sql.broadcastTables.sizeInBytes Driver中快取的表大小(表在HDFS上的大小,而不是表在記憶體中緩 存的實際內容的大小)的閾值,超過這個閾值則採用FIFO的方式擠出 最先快取的表。單位為Byte。 說明: 這是服務端引數,不能在客戶端通過SET的方式進行設定。 104857600 2. (可選)相關命令介紹。 如果使用者更新分割槽小表,更新後需要手動執行Analyze命令(Text表)或Refresh命令(Parquet表或Orc表)來更新元資料,從而重新快取更新後 的分割槽小表。 Analyze命令 ANALYZETABLEtableNameCOMPUTESTATISTICSnoscan; Refresh命令 RefreshtabletableName; 顯示所有快取的廣播表。 SHOWBROADCASTTABLES 清除所有快取的廣播表。 CLEARBROADCAST只有授予“ADMIN”許可權的使用者才能執行該命令。 執行該命令後,再次運行SQL語句時,會再觸發廣播並將廣播表快取起來。 使用限制 不支援Cache在記憶體中的小表。 不支援手動修改表的totalSize和ModifyTime等元資料資訊。 支援預廣播的小表格式為Text,Parquet和Orc。 支援的Join型別有:Inner Join、Left Join、Right Join、Full Join。 spark-sql中快取的廣播表只支援在單個CLI內共享(spark-sql重啟後需要重新廣播,觸發快取)。 12.9.3 Spark Streaming調優 操作場景 Spark Streaming作為一種mini-batch方式的流式處理框架,它主要的特點是:秒級時延和高吞吐量。因此Spark Streaming調優的目標:在秒級延遲的情景下,提高 Spark Streaming的吞吐能力,在單位時間處理儘可能多的資料。 說明: 本章節適用於輸入資料來源為Kafka的使用場景。 操作步驟 一個簡單的流處理系統由以下三部分元件組成:資料來源 + 接收器 + 處理器。資料來源為Kafka,接受器為Spark Streaming中的Kafka資料來源接收器,處理器為Spark Streaming。 對Spark Streaming調優,就必須使該三個部件的效能都最優化。 資料來源調優 在實際的應用場景中,資料來源為了保證資料的容錯性,會將資料儲存在本地磁碟中,而Spark Streaming的計算結果往往全部在記憶體中完成,資料來源很有可能 成為流式系統的最大瓶頸點。 對Kafka的效能調優,有以下幾個點: 使用Kafka-0.8.2以後版本,可以使用非同步模式的新Producer介面。 配置多個Broker的目錄,設定多個IO執行緒,配置Topic合理的Partition個數。 詳情請參見Kafka開源文件中的“效能調優”部分:http://kafka.apache.org/documentation.html 接收器調優 Spark Streaming中已有多種資料來源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器型別最多,也是最成熟一套接收器。 Kafka包括三種模式的接收器API: KafkaReceiver:直接接收Kafka資料,程序異常後,可能出現數據丟失。 ReliableKafkaReceiver:通過ZooKeeper記錄接收資料位移。 DirectKafka:直接通過RDD讀取Kafka每個Partition中的資料,資料高可靠。 從實現上來看,DirectKafka的效能會是最好的,實際測試上來看,DirectKafka也確實比其他兩個API效能好了不少。因此推薦使用DirectKafka的API實現接 收器。 資料接收器作為一個Kafka的消費者,對於它的配置優化,請參見Kafka開源文件:http://kafka.apache.org/documentation.html 處理器調優 Spark Streaming的底層由Spark執行,因此大部分對於Spark的調優措施,都可以應用在Spark Streaming之中,例如: 資料序列化 配置記憶體 設定並行度 使用External Shufflfflffle Service提升效能 說明: 在做Spark Streaming的效能優化時需注意一點,越追求效能上的優化,Spark Streaming整體的可靠性會越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置為“false”的時候,會明顯減少磁碟的操作,提高效能,但由於缺少WAL機制,會出現異常恢 復時,資料丟失。 因此,在調優Spark Streaming的時候,這些保證資料可靠性的配置項,在生產環境中是不能關閉的。 12.9.4 Spark CBO調優 操作場景 SQL語句轉化為具體執行計劃是由SQL查詢編譯器決定的,同一個SQL語句可以轉化成多種物理執行計劃,如何指導編譯器選擇效率最高的執行計劃,這就是優化器的 主要作用。傳統資料庫(例如Oracle)的優化器有兩種:基於規則的優化器(Rule-Based Optimization,RBO)和基於代價的優化器(Cost-Based Optimization, CBO)。 RBO RBO使用的規則是根據經驗形成的,只要按照這個規則去寫SQL語句,無論資料表中的內容怎樣、資料分佈如何,都不會影響到執行計劃。CBO CBO是根據實際資料分佈和組織情況,評估每個計劃的執行代價,從而選擇代價最小的執行計劃。 目前Spark的優化器都是基於RBO的,已經有數十條優化規則,例如謂詞下推、常量摺疊、投影裁剪等,這些規則是有效的,但是它對資料是不敏感的。導致的一個問 題就是資料表中資料分佈發生變化時,RBO是不感知的,基於RBO生成的執行計劃不能確保是最優的。而CBO的重要作用就是能夠根據實際資料分佈估算出SQL語 句,生成一組可能被使用的執行計劃中代價最小的執行計劃,從而提升效能。 目前CBO主要的優化點是Join演算法選擇。舉個簡單例子,當兩個表做Join操作,如果其中一張原本很大的表經過Filter操作之後結果集小於BroadCast的閾值,在沒有 CBO情況下是無法感知大表過濾後變小的情況,採用的是SortMergeJoin演算法,涉及到大量Shufflfflffle操作,很耗費效能;在有CBO的情況下是可以感知到結果集的變 化,採用的是BroadcastHashJoin演算法,會將過濾後的小表BroadCast到每個節點,轉變為非Shufflfflffle操作,從而大大提高效能。 操作步驟 Spark CBO的設計思路是,基於表和列的統計資訊,對各個操作運算元(Operator)產生的中間結果集大小進行估算,最後根據估算的結果來選擇最優的執行計劃。 1. 設定配置項。 在“spark-defaults.conf”配置檔案中增加配置項“spark.sql.cbo”,將其設定為true,預設為false。 在客戶端執行SQL語句setspark.sql.cbo=true進行配置。 2. 執行統計資訊生成命令,得到統計資訊。 說明: 此步驟只需在運行所有SQL前執行一次。如果資料集發生了變化(插入、更新或刪除),為保證CBO的優化效果,需要對有變化的表或者列再次執行 統計資訊生成命令重新生成統計資訊,以得到最新的資料分佈情況。 表:執行COMPUTESTATSFORTABLEsrc命令計算表的統計資訊,統計資訊包括記錄條數、檔案數和物理儲存總大小。 列: 執行COMPUTESTATSFORTABLEsrcONCOLUMNS命令計算所有列的統計資訊。 執行COMPUTESTATSFORTABLEsrcONCOLUMNS name,age命令計算表中name和age兩個欄位的統計資訊。 當前列的統計資訊支援四種類型:數值型別、日期型別、時間型別和字元串型別。對於數值型別、日期型別和時間型別,統計資訊包括: Max、Min、不同值個數(Number of Distinct Value,NDV)、空值個數(Number of Null)和Histogram(支援等寬、等高直方圖);對於字 符串型別,統計資訊包括:Max、Min、Max Length、Average Length、不同值個數(Number of Distinct Value,NDV)、空值個數 (Number of Null)和Histogram(支援等寬直方圖)。 3. CBO調優 自動優化:使用者根據自己的業務場景,輸入SQL語句查詢,程式會自動去判斷輸入的SQL語句是否符合優化的場景,從而自動選擇Join優化演算法。 手動優化:使用者可以通過DESCFORMATTEDsrc命令檢視統計資訊,根據統計資訊的分佈,人工優化SQL語句。 12.9.5 Carbon效能調優 查詢效能調優 Carbon可以通過調整各種引數來提高查詢效能。大部分引數聚焦於增加並行性處理和更好地使用系統資源。 字典(Dictionary)快取:Carbon採用字典編碼,以提升查詢效能和資料檔案壓縮率。在一個表格中,Carbon會對每個字典編碼列建立字典檔案。這些字典 檔案通過解碼查詢執行結果,由查詢處理器載入到記憶體中。一旦完成載入,這些字典檔案會被儲存在記憶體中,避免被再次從磁碟讀取,以加速查詢執行的速 度。但是,如果表的數量較多,則不能將所有列字典檔案都儲存至物理記憶體中。因此,Carbon對儲存在其記憶體快取中的列字典檔案的數目有限制。該限制值 可通過下列屬性引數進行配置。設定一個較大的值,可在記憶體中快取更多列字典檔案數目,進而提升查詢效能。單位為MB,預設值為0。例如,可配置 為“carbon.max.level.cache.size=[10]”。 Spark Executor數量:Executor是Spark並行性的基礎實體。通過增加Executor數量,叢集中的並行數量也會增加。關於如何配置Executor數量,請參考 Spark資料。 Executor核:每個Executor內,並行任務數受Executor核的配置控制。通過增加Executor核數,可增加並行任務數,從而提高效能。關於如何配置Executor 核數,請參考Spark資料。 HDFS block容量:Carbon通過給不同的處理器分配不同的block來分配查詢任務。所以一個HDFS block是一個分割槽單元。另外,Carbon在Spark驅動器中, 支援全域性block級索引,這有助於減少需要被掃描的查詢block的數量。設定較大的block容量,可提高I/O效率,但是會降低全域性索引效率;設定較小的block 容量,意味著更多的block數量,會降低I/O效率,但是會提高全域性索引效率,同時,對於索引查詢會要求更多的記憶體。 掃描執行緒數量:掃描器(Scanner)執行緒控制每個任務中並行處理的資料塊的數量。通過增加掃描器執行緒數,可增加並行處理的資料塊的數量,從而提高性 能。可使用“carbon.properties”檔案中的“carbon.number.of.cores”屬性來配置掃描器執行緒數。例如,“carbon.number.of.cores = 4”。 Carbon查詢流程 當Carbon首次收到對某個表(例如表A)的查詢任務時,系統會載入表A的索引資料到記憶體中,執行查詢流程。當Carbon再次收到對錶A的查詢任務時,系統則不需要 再載入其索引資料。 在Carbon中執行查詢時,查詢任務會被分成幾個掃描任務。即,基於Carbon資料儲存的HDFS block對掃描任務進行分割。掃描任務由叢集中的執行器執行。掃描任務 可以並行、部分並行,或順序處理,具體採用的方式取決於執行器的數量以及配置的執行器核數。 查詢任務的某些部分可在獨立的任務級上處理,例如select和fifilter。查詢任務的某些部分可在獨立的任務級上進行部分處理,例如group-by、count、distinct count 等。 某些操作無法在任務級上處理,例如Having Clause(分組後的過濾),sort等。這些無法在任務級上處理,或只能在任務級上部分處理的操作需要在叢集內跨執行器 來傳輸資料(部分結果)。這個傳送操作被稱為shufflfflffle。 任務數量越多,需要shufflfflffle的資料就越多,會對查詢效能產生不利影響。 由於任務數量取決於HDFS block的數量,而HDFS block的數量取決於每個block的大小,因此合理選擇HDFS block的大小很重要,需要在提高並行性,進行shufflfflffle操 作的資料量和聚合表的大小之間達到平衡。分割和Executors的關係 如果分割數≤Executor數xExecutor核數,那麼任務將以並行方式運行。否則,某些任務只有在其他任務完成之後才能開始。因此,要確保Executor數xExecutor核數≥ 分割數。同時,還要確保有足夠的分割數,這樣一個查詢任務可被分為足夠多的子任務,從而確保並行性。 配置掃描器執行緒 掃描器執行緒屬性決定了每個分割的資料被劃分的可並行處理的資料塊的數量。如果數量過多,會產生很多小資料塊,效能會受到影響。如果數量過少,並行性不佳,性 能也會受到影響。因此,決定掃描器執行緒數時,最好考慮一個分割內的平均資料大小,選擇一個使資料塊不會很小的值。經驗法則是將單個塊大小(MB)除以250得 到的值作為掃描器執行緒數。 增加並行性還需考慮的重要一點是叢集中實際可用的CPU核數,確保並行計算數不超過實際CPU核數的75%至80%。 CPU核數約等於: 並行任務數x掃描器執行緒數。其中並行任務數為分割數和執行器數x執行器核數兩者之間的較小值。 資料載入效能調優 資料載入效能調優與查詢效能調優差異很大。跟查詢效能一樣,資料載入效能也取決於可達到的並行性。在資料載入情況下,工作執行緒的數量決定並行的單元。因此, 更多的執行器就意味著更多的執行器核數,每個執行器都可以提高資料載入效能。 同時,為了得到更好的效能,可在HDFS中配置如下引數。 表12­39 HDFS配置 引數 建議值 dfs.datanode.drop.cache.behind.reads false dfs.datanode.drop.cache.behind.writes false dfs.datanode.sync.behind.writes false 壓縮調優 Carbon結合少數輕量級壓縮演算法和重量級壓縮演算法來壓縮資料。雖然這些演算法可處理任何型別的資料,但如果資料經過排序,相似值在一起出現時,就會獲得更好的 壓縮率。 Carbon資料載入過程中,資料基於Table中的列順序進行排序,從而確保相似值在一起出現,以獲得更好的壓縮率。 由於Carbon按照Table中定義的列順序將資料進行排序,因此列順序對於壓縮效率起重要作用。如果低基數維度位於左邊,那麼排序後的資料分割槽範圍較小,壓縮效率 較高。如果高基數維度位於左邊,那麼排序後的資料分割槽範圍較大,壓縮效率較低。 記憶體調優 Carbon為記憶體調優提供了一個機制,其中資料載入會依賴於查詢中需要的列。不論何時,接收到一個查詢命令,將會獲取到該查詢中的列,並確保記憶體中這些列有數 據載入。在該操作期間,如果達到記憶體的閾值,為了給查詢需要的列提供記憶體空間,最少使用載入級別的檔案將會被刪除。 12.10 Storm 12.10.1 Storm效能調優 操作場景 通過調整Storm引數設定,可以提升特定業務場景下Storm的效能。 Storm引數入口:在FusionInsight Manager系統中,選擇“服務管理 > Storm > 服務配置”,“引數類別”設定為“全部配置”。 拓撲調優 當需要提升Storm資料量處理效能時,可以通過拓撲調優的操作提高效率。建議在可靠性要求不高的場景下進行優化。 表12­40 調優引數 配置引數 預設值 調優場景 topology.acker.executors null Acker的執行器數量。當業務應用對可靠性要求較低,允許不處理部分資料,可設定引數值 為“null”或“0”,以關閉Acker的執行器,減少流控制,不統計訊息時延,提高效能。 topology.max.spout.pending null Spout訊息快取數,僅在Acker不為0或者不為null的情況下生效。Spout將傳送到下游Bolt的 每條訊息加入到pending隊列,待下游Bolt處理完成並確認後,再從pending隊列移除,當 pending隊列佔滿時Spout暫停訊息傳送。增加pending值可提高Spout的每秒訊息吞吐量, 提高效能,但延時同步增加。 topology.transfer.buffffer.size 32 每個worker程序Disruptor訊息隊列大小,建議在4到32之間,增大訊息隊列可以提升吞吐 量,但延時可能會增加。 RES_CPUSET_PERCENTAGE 80 設定各個節點上的Supervisor角色實例(包含其啟動並管理的Worker程序)所使用的物理 CPU百分比。根據Supervisor所在節點業務量需求,適當調整引數值,優化CPU使用率。 JVM調優 當應用程式需要處理大量資料從而佔用更多的記憶體時,存在worker記憶體大於2GB的情況,推薦使用G1垃圾回收演算法。 表12­41 調優引數 配置引數 預設值 調優場景配置引數 預設值 調優場景 WORKER_GC_OPTS -Xms1G -Xmx1G -XX:+UseG1GC - XX:+PrintGCDetails - Xloggc:artifacts/gc.log - XX:+PrintGCDateStamps - XX:+PrintGCTimeStamps - XX:+UseGCLogFileRotation - XX:NumberOfGCLogFiles=10 - XX:GCLogFileSize=1M - XX:+HeapDumpOnOutOfMemoryError - XX:HeapDumpPath=artifacts/heapdump 應用程式記憶體中需要儲存大量資料,worker程序使用的記憶體大於2G,那麼建議使用 G1垃圾回收演算法,可修改引數值為“-Xms2G -Xmx5G -XX:+UseG1GC”。 12.11 YARN 12.11.1 通過容器可重用性提高任務的完成效率 操作場景 說明: 容器可重用與任務優先順序功能不能同時啟用。如果同時啟用,容器可重用功能可正常使用,任務優先順序設定將會失效。 容器可重用性可以提高任務完成的速度。其優勢如下所示: 對於HBase資料批量載入而言,容器可重用性的特性可作用於一些Map以及Reduce任務,使其能快速完成任務從而縮短HBase資料批量載入的時間。 減少容器排程的時間和初始化的時間。 一旦MapReduce作業被提交。它將分發至Map和Reduce任務中。然後應用管理器(以下簡稱AM)將執行如下操作。 1. AM向資源管理器(RM)申請容器去執行任務。所有容器將一起完成這個請求。 2. RM指定容器,之後AM將會聯絡節點管理器(NM)去啟動容器。 3. 容器啟動完畢,NM從AM拉取並執行任務。 4. 任務執行完畢,運行該任務的容器不會被立即終止,而是嘗試向AM拉取下一個任務。 若容器獲取到新任務,則該容器會自我清空並初始化,以適用於新任務。 若容器不能獲取到新任務,則會請求終止自己的運行。 操作步驟 開啟容器可重用性配置項。 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > Yarn > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱,修改表12-42引數值,然後重 新下載並安裝Yarn客戶端,引數配置生效。或直接在客戶端目錄下修改:如“/opt/client/Yarn/confifig/mapred-site.xml”檔案里修改表12-42引數。 表12­42 容器重用配置 引數 描述 預設值 mapreduce.container.reuse.enabled 該配置項設定為“true”則容器可重用,反之容器不可重用。 false mapreduce.container.reuse.enforce.strict-locality 該配置項指定是否遵循嚴格資料本地化。如果設定為“true”,則只有本地節點 上的任務能被分配到容器。 false 使用約束 容器的重用將受限於job中任務的總數。容器不會被不同的job共享。 容器不能被失敗的任務重用。 設計任務時,應保證一個任務完成後堆疊里不存留任何物件。這對於容器重用中的資料一致性以及記憶體的優化非常重要。 如果容器重用是啟用狀態,job將不會釋放容器給資源管理器,除非所有屬於該job的任務都已完成,這種情況會影響到公平排程的原則。 12.11.2 搶佔任務 操作場景 搶佔任務可精簡隊列中的job運行並提高資源利用率,由ResourceManager的capacity scheduler實現,其簡易流程如下: 1. 假設存在兩個隊列A和B。其中隊列A的capacity為25%,隊列B的capacity為75%。 2. 初始狀態下,任務1傳送給隊列A,此任務需要75%的叢集資源。之後任務2傳送到了隊列B,此任務需要50%的叢集資源。 3. 任務1將會使用隊列A提供的25%的叢集資源,並從隊列B獲取的50%的叢集資源。隊列B保留25%的叢集資源。 4. 啟用搶佔任務特性,則任務1使用的資源將會被搶佔。隊列B會從隊列A中獲取25%的叢集資源以滿足任務2的執行。 5. 當任務2完成後,叢集中存在足夠的資源時,任務1將重新開始執行。 操作步驟引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > YARN > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 表12­43 Preemption配置 引數 描述 yarn.resourcemanager.scheduler.monitor.enable 根據“yarn.resourcemanager.scheduler.monitor.policies”中的策略,啟用新的 scheduler監控。設定為“true”表示啟用監控,並根據scheduler的資訊,啟動搶佔 功能。設定為“false”表示不啟用。 yarn.resourcemanager.scheduler.monitor.policies 設定與scheduler配合的“SchedulingEditPolicy”的類的清單。 yarn.resourcemanager.monitor.capacity.preemption.observe_only 設定為“true”,則執行策略,但是不對叢集資源程序搶佔操作。 設定為“false”,則執行策略,且根據策略啟用叢集資源搶佔的功能。 yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval 根據策略監控的時間間隔,單位為毫秒。 yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill 應用傳送搶佔需求到停止container(釋放資源)的時間間隔,單位為毫秒。 預設情況下,若ApplicationMaster15秒內沒有終止container,ResourceManage 等待15秒後會強制終止。 yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round 在一個週期內能夠搶佔資源的最大的比例。 yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity 叢集中資源總量乘以此配置項的值加上某個隊列(例如隊列A)原有的資源量為資源 搶佔盲區。當隊列A中的任務實際使用的資源超過該搶佔盲區時,超過部分的資源將 會被搶佔。 說明: 設定的值越小越有利於資源搶佔。 yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor 設定搶佔目標,Container只會搶佔所配置比例的資源。 示例,如果設定為0.5,則在 5*“yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill” 時間內,任務會回收所搶佔資源的近95%。即接連搶佔5次,每次搶佔待搶佔資源 0.5,呈幾何收斂,每次的時間間隔 為“yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill” 12.11.3 任務優先順序 操作場景 叢集的資源競爭場景如下: 1. 提交兩個低優先順序的應用Job 1和Job 2。 2. 正在運行中的Job 1和Job 2有部分task處於running狀態,但由於叢集或隊列資源容量有限,仍有部分task未得到資源而處於pending狀態。 3. 提交一個較高優先順序的應用Job 3,此時會出現如下資源分配情況:當Job 1和Job 2中running狀態的task運行結束並釋放資源後,Job 3中處於pending狀態 的task將優先得到這部分新釋放的資源。 4. Job 3完成後,資源釋放給Job 1、Job 2繼續執行。 使用者可以在YARN中配置任務的優先順序。任務優先順序是通過ResourceManager的Capacity Scheduler實現的。 操作步驟 設定引數“yarn.app.priority”或“mapreduce.job.priority”,使用命令行介面或API介面設定任務優先順序。若兩種介面都使用,則考慮設定引數“yarn.app.priority”。 命令行介面。 傳送任務時,新增“-Dyarn.app.priority=<priority>”引數。 <priority>可以設定為: VERY_HIGH HIGH NORMAL LOW VERY_LOW API介面。 使用者也可以使用API配置物件的優先順序。 設定優先順序,可通過Confifiguration.set("yarn.app.priority", <priority>)或Job.setPriority(JobPriority priority)設定。 說明: <priority> 的可替換值參見• 命令行介面。。 12.11.4 節點配置調優操作場景 合理配置大資料叢集的排程器後,還可通過調節每個節點的可用記憶體、CPU資源及本地磁碟的配置進行效能調優。 具體包括以下配置項: 可用記憶體 CPU虛擬核數 物理CPU使用百分比 記憶體和CPU資源的協調 本地磁碟 操作步驟 引數入口: 在FusionInsight Manager系統中,選擇“服務管理 > Yarn > 服務配置”,“引數類別”型別設定為“全部配置”。在搜尋框中輸入引數名稱。 可用記憶體 除了分配給作業系統、其他服務的記憶體外,剩餘的資源應盡量分配給YARN。通過如下配置引數進行調整。 例如,如果一個container預設使用512M,則記憶體使用的計算公式為:512M*container數。 預設情況下,Map或Reduce container會使用1個虛擬CPU核心和1024MB記憶體,ApplicationMaster使用1536MB記憶體。 引數 描述 預設值 yarn.nodemanager.resource.memory-mb 設定可分配給容器的物理記憶體數量。建議配置為>24576(24G) 單位:MB 8192 CPU虛擬核數 建議將此配置設定在邏輯核數的1.5~2倍之間。如果上層計算應用對CPU的計算能力要求不高,可以配置為2倍的邏輯CPU。 引數 描述 預設值 yarn.nodemanager.resource.cpu-vcores 表示該節點上YARN可使用的虛擬CPU個數,預設是8。 目前推薦將該值設值為邏輯CPU核數的1.5~2倍之間。 8 物理CPU使用百分比 建議預留適量的CPU給作業系統和其他程序(資料庫、HBase等)外,剩餘的CPU核都分配給YARN。可以通過如下配置引數進行調整。 引數 描述 預設值 yarn.nodemanager.resource.percentage- physical-cpu-limit 表示該節點上YARN可使用的物理CPU百分比。預設是100,即不進行 CPU控制,YARN可以使用節點全部CPU。該引數只支援檢視,可通過 調整YARN的RES_CPUSET_PERCENTAGE引數來修改本引數值。注 意,目前推薦將該值設為可供YARN叢集使用的CPU百分數。 例如:當前節點除了YARN服務外的其他服務(如HBase、HDFS、Hive 等)及系統程序使用CPU為20%左右,則可以供YARN排程的CPU為1- 20%=80%,即配置此引數為80。 90 本地磁碟 由於本地磁碟會提供給MapReduce寫job執行的中間結果,資料量大。因此配置的原則是磁碟盡量多,且磁碟空間盡量大,單個達到百GB以上規模最好。簡 單的做法是配置和data node相同的磁碟,只在最下一級目錄上不同即可。 說明: 多個磁碟之間使用逗號隔開。 引數 描述 預設值引數 描述 預設值 yarn.nodemanager.log- dirs 日誌存放地址(可配置多個目錄)。 容器日誌的儲存位置。預設值為%{@auto.detect.datapart.nm.logs}。如果有資料分割槽,基於該資料 分割槽生成一個類 似/srv/BigData/hadoop/data1/nm/containerlogs,/srv/BigData/hadoop/data2/nm/containerlogs 的路徑清單。如果沒有資料分割槽,生成預設路徑/srv/BigData/yarn/data1/nm/containerlogs。除了 使用表示式以外,還可以輸入完整的路徑清單,比如/srv/BigData/yarn/data1/nm/containerlogs 或/srv/BigData/yarn/data1/nm/containerlogs,/srv/BigData/yarn/data2/nm/containerlogs。這 樣資料就會儲存在所有設定的目錄中,一般會是在不同的裝置中。為保證磁碟IO負載均衡,最好提供 幾個路徑且每個路徑都對應一個單獨的磁碟。應用程式的本地化後的日誌目錄存在於相對路 徑/application_%{appid}中。單獨容器的日誌目錄,即container_{$contid},是該路徑下的子目錄。 每個容器目錄都含容器生成的stderr、stdin及syslog檔案。要新增目錄,比如新 增/srv/BigData/yarn/data2/nm/containerlogs目錄,應首先刪 除/srv/BigData/yarn/data2/nm/containerlogs下的檔案。之後, 為/srv/BigData/yarn/data2/nm/containerlogs賦予跟/srv/BigData/yarn/data1/nm/containerlogs 一樣的讀寫許可權,再將/srv/BigData/yarn/data1/nm/containerlogs修改 為/srv/BigData/yarn/data1/nm/containerlogs,/srv/BigData/yarn/data2/nm/containerlogs。可 以新增目錄,但不要修改或刪除現有目錄。否則,NodeManager的資料將丟失,且服務將不可用。 【預設值】%{@auto.detect.datapart.nm.logs} 【注意】請謹慎修改該項。如果配置不當,將造成服務不可用。當角色級別的該配置項修改後,所有 實例級別的該配置項都將被修改。如果實例級別的配置項修改後,其他實例的該配置項的值保持不 變。 %{@auto.detect.datapart. yarn.nodemanager.local- dirs 本地化後的檔案的儲存位置。預設值為%{@auto.detect.datapart.nm.localdir}。如果有資料分割槽,基 於該資料分割槽生成一個類 似/srv/BigData/hadoop/data1/nm/localdir,/srv/BigData/hadoop/data2/nm/localdir的路徑清 單。如果沒有資料分割槽,生成預設路徑/srv/BigData/yarn/data1/nm/localdir。除了使用表示式以 外,還可以輸入完整的路徑清單,比如/srv/BigData/yarn/data1/nm/localdir 或/srv/BigData/yarn/data1/nm/localdir,/srv/BigData/yarn/data2/nm/localdir。這樣資料就會存 儲在所有設定的目錄中,一般會是在不同的裝置中。為保證磁碟IO負載均衡,最好提供幾個路徑且每 個路徑都對應一個單獨的磁碟。應用程式的本地化後的檔案目錄存在於相對路徑/usercache/% {user}/appcache/application_%{appid}中。單獨容器的工作目錄,即container_%{contid},是該路 徑下的子目錄。要新增目錄,比如新增/srv/BigData/yarn/data2/nm/localdir目錄,應首先刪 除/srv/BigData/yarn/data2/nm/localdir下的檔案。之後, 為/srv/BigData/hadoop/data2/nm/localdir賦予跟/srv/BigData/hadoop/data1/nm/localdir一樣的 讀寫許可權,再將/srv/BigData/yarn/data1/nm/localdir修改 為/srv/BigData/yarn/data1/nm/localdir,/srv/BigData/yarn/data2/nm/localdir。可以新增目錄, 但不要修改或刪除現有目錄。否則,NodeManager的資料將丟失,且服務將不可用。 【預設值】%{@auto.detect.datapart.nm.localdir} 【注意】請謹慎修改該項。如果配置不當,將造成服務不可用。當角色級別的該配置項修改後,所有 實例級別的該配置項都將被修改。如果實例級別的配置項修改後,其他實例的該配置項的值保持不 變。 % {@auto.detect.datapart.nm 12.11.5 JVM引數優化 操作場景 當叢集資料量達到一定規模後,JVM的預設配置將無法滿足叢集的業務需求,輕則叢集變慢,重則叢集服務不可用。所以需要根據實際的業務情況進行合理的JVM引數 配置,提高叢集效能。 操作步驟 引數入口: Yarn角色相關的JVM引數需要配置在“${HADOOP_HOME}/etc/hadoop”目錄下的“yarn-env.sh”檔案中。 每個角色都有各自的JVM引數配置變量,如表12-44。 表12­44 Yarn相關JVM引數配置變量 變量名 變量影響的角色 YARN_OPTS 該變量中設定的引數,將影響Yarn的所有角色。 YARN_CLIENT_OPTS 該變量中設定的引數,將影響Yarn的Client程序。 YARN_RESOURCEMANAGER_OPTS 該變量中設定的引數,將影響Yarn的ResourceManager。 YARN_HISTORYSERVER_OPTS 該變量中設定的引數,將影響Yarn的HistoryServer。 YARN_TIMELINESERVER_OPTS 該變量中設定的引數,將影響Yarn的TimelineServer。 YARN_NODEMANAGER_OPTS 該變量中設定的引數,將影響Yarn的NodeManager。 YARN_PROXYSERVER_OPTS 該變量中設定的引數,將影響Yarn的ProxyServer。 配置方式舉例: export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,N