1. 程式人生 > >Spark如何讀取一些大資料集到本地機器上

Spark如何讀取一些大資料集到本地機器上

最近在使用spark處理分析一些公司的埋點資料,埋點資料是json格式,現在要解析json取特定欄位的資料,做一些統計分析,所以有時候需要把資料從叢集上拉到driver節點做處理,這裡面經常出現的一個問題就是,拉取結果集過大,而驅動節點記憶體不足,經常導致OOM,也就是我們常見的異常:

java.lang.OutOfMemoryError: Java heap space

這種寫法的程式碼一般如下:

//載入HDFS資料
 val rdd=sc.textFile("/data/logs/*")
 
 //在驅動程式獲取結果集
 val datas=ArrayBuffer[String]()
 
 //把所有資料,拉倒驅動端操作
 rdd.collect.foreach(line=>{
 
   datas += line.split('#')(1)  //得到某個欄位
 
 })
 
 sc.stop()

上面的這種寫法,基本原理就是一次性把所有分割槽的資料,全部讀取到driver節點上,然後開始做處理,所以資料量大的時候,經常會出現記憶體溢位情況。

(問題一)如何避免這種情況?

分而治之,每次只拉取一個分割槽的資料到驅動節點上,處理完之後,再處理下一個分資料的資料。

(問題二)如果單個分割槽的資料已經大到記憶體裝不下怎麼辦?

給資料集增加更多的分割槽,讓大分割槽變成多個小分割槽。

(問題三)如果結果集資料大於記憶體的大小怎麼辦?

要麼增加驅動節點的記憶體,要麼給每個分割槽的資料都持久化本地檔案上,不再記憶體中維護

下面來看下關鍵問題,如何修改spark的rdd分割槽數量?

我們知道在spark裡面RDD是資料來源的抽象模型,RDD裡面實際上是把一份大資料來源切分成了多個分割槽資料,然後來並行處理這份大資料集。

預設情況下如果Spark從HDFS上載入資料,預設分割槽個數是按照HDFS的block size來切分的,當然我們在載入的時候可以指定的分割槽個數。

textFile(path,partitionNums)//第二個引數可以指定分割槽個數

如果在載入時不指定分割槽個數,spark裡面還提供了兩個函式來進行重分割槽:

1def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2def repartition(numPartitions: Int):RDD[T]

接著我們來看下coalesce函式和repartition函式的區別:

通過檢視原始碼得知repartition函式內部實際上是呼叫了coalesce函式第二個引數等於true時的封裝。所以我們重點來關注下coalesce函式即可:

coalesce的第一個引數是修改後的分割槽個數

coalesce的第二個引數是控制是否需要shuffle

舉一個例子:

當前我們RDD的分割槽個數是100:

(1)如果要變成10,應該使用

rdd.coalesce(10,false)

(2)如果要變成300,應該使用

rdd.coalesce(300,true)

(3)如果要變成1,應該使用

rdd.coalesce(1,true)

這裡解釋一下:

分割槽數從多變少,一般是不需要開啟shuffle的,這樣效能最高,因為不需要跨網路混洗資料,當然你也可以開啟shuffle在特定場景下,如分割槽資料極其不均衡。但建議一般不要使用。

分割槽數從少變多,必須開啟shuffle,如果不開啟那麼分割槽資料是不會改變的,由少變多必須得重新混洗資料才能變多,這裡需要注意一點,如果資料量特別少,那麼會有一些分割槽的資料是空。

最後的例子是一種極端場景,如果從多變成1,不開啟shuffle,那麼可能就個別節點計算壓力特別大,叢集資源不能充分利用,所以有必要開啟shuffle,加速合併計算的流程。

明白瞭如何改變rdd的分割槽個數之後,我們就可以文章開頭遇到的問題結合起來,拉取大量資料到驅動節點上,如果整體資料集太大,我們就可以增加分割槽個數,迴圈拉取,但這裡面需要根據具體的場景來設定分割槽個數,因為分割槽個數越多,在spark裡面生成的task數目就越多,task數目太多也會影響實際的拉取效率,在本案例中,從hdfs上讀取的資料預設是144個分割槽,大約1G多點資料,沒有修改分割槽個數的情況下處理時間大約10分鐘,在調整分割槽個數為10的情況下,拉取時間大約在1-2分鐘之間,所以要根據實際情況進行調整。

文章開始前的程式碼優化後的如下:

def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
    if(seq==idx) ds else Iterator()
  }
------------------------------
//載入HDFS資料
 val rdd=sc.textFile("/data/logs/*")
 
 //在驅動程式獲取結果集
 val datas=ArrayBuffer[String]()
 //重分割槽併合理優化分割槽個數
 val new_rdd=rdd.coalesce(10)
 //得到所有的分割槽資訊
 val parts= new_rdd.partitions
 //迴圈處理每個分割槽的資料,避免導致OOM
    for(p<-parts){
      //獲取分割槽號
      val idx=p.index
    //第二個引數為true,避免重新shuffle,保留原始分割槽資料    
    val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
    //讀取結果資料
    val data=parRdd.collect()
    //迴圈處理資料
    for(line<-data){
    datas += line.split('#')(1)  //得到某個欄位
    }
    
      
      }

最後在看下,spark任務的提交命令:

spark-submit  --class  SparkHdfsDataAnalysis 
--conf spark.driver.maxResultSize=1g  
--master yarn  
--executor-cores 5   
--driver-memory 2g  
--executor-memory 3g 
--num-executors 10    
--jars  $jars     spark-analysis.jar  $1 $2

這裡面主要關注引數:

spark.driver.maxResultSize=1g  
driver-memory 2g

單次拉取資料結果集的最大位元組數,以及驅動節點的記憶體,如果在進行大結果集下拉時,需要特別注意下這兩個引數的設定。

參考文件:

有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明