Spark如何讀取一些大資料集到本地機器上
最近在使用spark處理分析一些公司的埋點資料,埋點資料是json格式,現在要解析json取特定欄位的資料,做一些統計分析,所以有時候需要把資料從叢集上拉到driver節點做處理,這裡面經常出現的一個問題就是,拉取結果集過大,而驅動節點記憶體不足,經常導致OOM,也就是我們常見的異常:
java.lang.OutOfMemoryError: Java heap space
這種寫法的程式碼一般如下:
//載入HDFS資料
val rdd=sc.textFile("/data/logs/*")
//在驅動程式獲取結果集
val datas=ArrayBuffer[String]()
//把所有資料,拉倒驅動端操作
rdd.collect.foreach(line=>{
datas += line.split('#')(1) //得到某個欄位
})
sc.stop()
上面的這種寫法,基本原理就是一次性把所有分割槽的資料,全部讀取到driver節點上,然後開始做處理,所以資料量大的時候,經常會出現記憶體溢位情況。
(問題一)如何避免這種情況?
分而治之,每次只拉取一個分割槽的資料到驅動節點上,處理完之後,再處理下一個分資料的資料。
(問題二)如果單個分割槽的資料已經大到記憶體裝不下怎麼辦?
給資料集增加更多的分割槽,讓大分割槽變成多個小分割槽。
(問題三)如果結果集資料大於記憶體的大小怎麼辦?
要麼增加驅動節點的記憶體,要麼給每個分割槽的資料都持久化本地檔案上,不再記憶體中維護
下面來看下關鍵問題,如何修改spark的rdd分割槽數量?
我們知道在spark裡面RDD是資料來源的抽象模型,RDD裡面實際上是把一份大資料來源切分成了多個分割槽資料,然後來並行處理這份大資料集。
預設情況下如果Spark從HDFS上載入資料,預設分割槽個數是按照HDFS的block size來切分的,當然我們在載入的時候可以指定的分割槽個數。
textFile(path,partitionNums)//第二個引數可以指定分割槽個數
如果在載入時不指定分割槽個數,spark裡面還提供了兩個函式來進行重分割槽:
(1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2)def repartition(numPartitions: Int):RDD[T]
接著我們來看下coalesce函式和repartition函式的區別:
通過檢視原始碼得知repartition函式內部實際上是呼叫了coalesce函式第二個引數等於true時的封裝。所以我們重點來關注下coalesce函式即可:
coalesce的第一個引數是修改後的分割槽個數
coalesce的第二個引數是控制是否需要shuffle
舉一個例子:
當前我們RDD的分割槽個數是100:
(1)如果要變成10,應該使用
rdd.coalesce(10,false)
(2)如果要變成300,應該使用
rdd.coalesce(300,true)
(3)如果要變成1,應該使用
rdd.coalesce(1,true)
這裡解釋一下:
分割槽數從多變少,一般是不需要開啟shuffle的,這樣效能最高,因為不需要跨網路混洗資料,當然你也可以開啟shuffle在特定場景下,如分割槽資料極其不均衡。但建議一般不要使用。
分割槽數從少變多,必須開啟shuffle,如果不開啟那麼分割槽資料是不會改變的,由少變多必須得重新混洗資料才能變多,這裡需要注意一點,如果資料量特別少,那麼會有一些分割槽的資料是空。
最後的例子是一種極端場景,如果從多變成1,不開啟shuffle,那麼可能就個別節點計算壓力特別大,叢集資源不能充分利用,所以有必要開啟shuffle,加速合併計算的流程。
明白瞭如何改變rdd的分割槽個數之後,我們就可以文章開頭遇到的問題結合起來,拉取大量資料到驅動節點上,如果整體資料集太大,我們就可以增加分割槽個數,迴圈拉取,但這裡面需要根據具體的場景來設定分割槽個數,因為分割槽個數越多,在spark裡面生成的task數目就越多,task數目太多也會影響實際的拉取效率,在本案例中,從hdfs上讀取的資料預設是144個分割槽,大約1G多點資料,沒有修改分割槽個數的情況下處理時間大約10分鐘,在調整分割槽個數為10的情況下,拉取時間大約在1-2分鐘之間,所以要根據實際情況進行調整。
文章開始前的程式碼優化後的如下:
def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
if(seq==idx) ds else Iterator()
}
------------------------------
//載入HDFS資料
val rdd=sc.textFile("/data/logs/*")
//在驅動程式獲取結果集
val datas=ArrayBuffer[String]()
//重分割槽併合理優化分割槽個數
val new_rdd=rdd.coalesce(10)
//得到所有的分割槽資訊
val parts= new_rdd.partitions
//迴圈處理每個分割槽的資料,避免導致OOM
for(p<-parts){
//獲取分割槽號
val idx=p.index
//第二個引數為true,避免重新shuffle,保留原始分割槽資料
val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
//讀取結果資料
val data=parRdd.collect()
//迴圈處理資料
for(line<-data){
datas += line.split('#')(1) //得到某個欄位
}
}
最後在看下,spark任務的提交命令:
spark-submit --class SparkHdfsDataAnalysis
--conf spark.driver.maxResultSize=1g
--master yarn
--executor-cores 5
--driver-memory 2g
--executor-memory 3g
--num-executors 10
--jars $jars spark-analysis.jar $1 $2
這裡面主要關注引數:
spark.driver.maxResultSize=1g
driver-memory 2g
單次拉取資料結果集的最大位元組數,以及驅動節點的記憶體,如果在進行大結果集下拉時,需要特別注意下這兩個引數的設定。
參考文件:
有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。