Spark troubleshooting 1運算元返回null錯誤 2錯誤持久化以及checkpoint
阿新 • • 發佈:2018-12-26
一、運算元返回為null
問題
在有些運算元函式裡,我們都需要有返回值。但是,有些可能不需要返回值,但是這時候不能直接返回null,返回null將會導致錯誤
Scala.Math(NULL) //異常
解決方法
- 如果不想有返回值,可以在返回的時候,返回一些特殊的值,比如“-999”
- 獲取到rdd之後,對rdd進行filter操作,如果資料是-999的,可以返回false,進行過濾掉
- filter之後,使用coalesce運算元壓縮rdd的partition數量,讓各個partition資料比較緊湊。提升效能。
return actionRDD.mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String, Row>("-999", RowFactory.createRow("-999")); } });
二、持久化使用方式
有時候希望重複使用一個rdd,不用反覆計算rdd,可以直接使用通過各個節點上的executor 的BlockManager管理記憶體、磁碟資料
但是使用rdd持久化應該是 像下面這樣
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
如果直接像下面
sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
會報錯,file not found錯誤
三、checkpoint
持久化,一般情況可以正常工作,但是可能出現意外,快取在記憶體中的資料莫名其妙丟失,或者儲存在磁碟檔案中的資料,莫名其妙被刪除。
但是有些rdd計算可能非常耗時,rdd之前有大量的父rdd,如果重新計算一個partition,可能就需要重新計算之前所有的父rdd對應的partition,這種情況就可以對rdd進行checkpoint,以防萬一。進行checkpoint,就是說,會將rdd的資料,持久化一份到容錯檔案系統上(比如hdfs)。在對rdd進行計算的時候,如果發現快取資料不見了,就會去checkpoint目錄查詢資料,如果有的話,就直接使用避免重新計算。
這麼理解,checkpoint其實算是 cache的一個備用。如果cache失效了,checkpoint就可以派上用場。
好處是
壞處是:進行checkpoint操作的時候,將rdd資料寫入hdfs中的時候,很耗費效能。
checkpoint原理
- 在程式碼中,用sparkContext,設定一個checkpoint目錄,比如hdfs目錄
JavaSparkContext sc = new JavaSparkContext(conf);
// sc.checkpointFile("hdfs://");
- 在程式碼中,對需要進行checkpoint的rdd,執行rdd.checkpoint()
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY()); // sessionid2actionRDD.checkpoint();
- RDDCheckpointData(spark 內部的api) 接管你的RDD,會標記為marked for checkpoint,準備進行checkpoint
- job執行完之後,會呼叫一個finalRDD.doCheckpoint()方法,順著rdd lineage,回溯掃描,發現有標記為checkpoint的rdd,就會進行二次標記,inProgressCheckpoint,正在接受checkpoint操作
- job執行完後,就會啟動一個內部新的rdd,去將標記為inProgressCheckpoint的rdd的資料,都寫入hdfs檔案中。(備註,如果rdd之前cache過,會直接從快取中獲取資料,寫入hdfs中;如果沒有cache過,那麼就會重新計算一遍這個rdd,再checkpoint)
- 將checkpoint過的rdd之前的依賴rdd,改成一個CheckpointRDD*,強制改變你的rdd的lineage。後面如果rdd的cache資料獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的檔案系統,比如hdfs,中,獲取checkpoint的資料。