Spark-core-問題記錄:join shuffle
1、partitionBy:當hashCode為負時,拋異常:java.lang.ArrayIndexOutOfBoundsException,
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
2、rdd.partitionBy(new TdidPartitioner(10)).mapPartitionsWithIndex {
case
Iterator.single(idx, value)
}
A。返回值為Iterator中的value為Iterator,拋異常:NotSerializableException
- object not serializable (class: org.apache.spark.InterruptibleIterator, value: non-empty iterator)
B。把value改為Iterator.single(idx, value.toSet)
3、join於zipPartitions區別
tmpRdd.join(bloomFilterRdd)、tmpRdd.zipPartitions(bloomFilterRdd)
的區別:
A。join會先計算左邊的rdd,然後計算右邊的rdd,根據key join
B。zipPartitions,根據key,一個一個進行匹配
4、persist、以及導致shuffle增長的原因(streaming)
val tmpRDD = Analysis.analyserJoin(tdidRDD, filterRDD).persist()
//filterRDD.unpersist()
filterRDD = tmpRDD
//filterRDD.persist()
filterRDD.count() *** tmpRDD.count() ***
tmpRDD.unpersist()
問題描述:
迴圈使用filterRDD,tdidRDD每次都是不同的資料
A。在呼叫action操作之前,先呼叫unpersist又呼叫persist,運算元會先執行上次操作的計算,因為unpersist把之前的計算釋放,因為所有的計算在呼叫action操作時,才會真正的計算,unpersist則會把RDD標記為不需要persist,並且釋放block塊
B。如果是tmpRDD.count(),也會執行上次filterRDD的計算,因為count操作只是執行了tmpRDD之前的操作,下次用到filterRDD時,需要計算filterRDD的結果
C。每執行一次action操作,都會重新計算一遍,除非使用persisit方法
val tmpRDD = Analysis.analyserZip(tdidRDD, filterRDD)
filterRDD.count()
filterRDD.unpersist()
filterRDD = tmpRDD
*** 使用這種方式,不會導致shuffle增長 ***
5、join操作、co-partitioin
A。按相同的paritioner分割槽後,如果有mapPartitionsWithIndex、mapPartitions 操作,
需要設定preservesPartitioning這個引數,
預設值:preservesPartitioning: Boolean = false
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
B。按相同的paritioner分割槽後,如果map、flatMap、filter 操作,partitioner被置為None
C。streaming例項驗證:
1、當為no-partitioner時,shuffle每批資料都會增長,並且,序列化和反序列化後,shuffle並不下降,
即使重啟系統後,重新讀區序列化後的資料,會在上次shuffleWrite的基礎上,
增加shuffle(filter RDD shuffleWrite + tdidRDD shuffleWrite)全shuffle
結論:BloomFilter的shuffleWrite和shuffleRead的增長,其大小跟put的資料有關,put的資料越大(重複率越小),增量越大
2、當為no-partitioner時,其中一個RDD有partitioner,另一個partitioner為None,shuffle的量為partitioner為None的RDD的資料量
3、當為co-partitioner時,partition相同,shuffle每批增長的量為上批tdidRDD的資料量,當序列化反序列後,shuffle下降為20M左右
4、當為co-partitioner時,partition不同,shuffle的量為partition少的RDD當shuffle
D。單個join,a.join(b),shuffle跟a、b在join左右兩側的位置無關
E。zipPartition,即使沒有相同的partitioner,也沒有shuffle,計算的時候是多個RDD的這個partition算完,再算下一個partition
6、localcheckpoint
A。會打斷RDD的依賴關係,並且把當前的RDD儲存為一個新的RDD
B。當有塊丟失時,會導致系統異常,退出