spark調優之數據傾斜
1)數據分區的策略:
- 隨機分區:每一個數據分配的任意一個分區的概率是均等的
- Hash分區:使用數據的Hash分區值,%分區數。(導致數據傾斜的原因)
- 範圍分區:將數據範圍劃分,數據分配到不同的範圍中(分布式的全局排序)
2)數據傾斜的原因:
Shuffle數據之後導致數據分布不均勻,但是所有節點的機器的性能都是一樣的,程序也是一樣的,就是數據量不一致,所以決定了task的執行時長就被數據量決定了。
3)定位數據傾斜的代碼:
數據傾斜發生在shuffle過程,有shuffle過程的算子有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。或者查看哪一個task執行緩慢、內存溢出...
4)查看數據傾斜的key的分布情況:
//使用spark中的抽樣算子sample,查看相應的key的分布 val sampledPairs = pairs.sample(false, 0.1) //抽樣 val sampledWordCounts = sampledPairs.countByKey() sampledWordCounts.foreach(println(_))
(2)數據傾斜的解決方案
1)過濾掉少數數據傾斜的key:
如果發現導致數據傾斜的key是極少數,並且對計算本身影響不大,那麽這種方案比較適用。
實現原理:通過spark的sample算子,定位到數據傾斜的key,然後使用filter算子將其過濾即可。
2)提高shuffle的並行度:
這是一種嘗試性策略:就是提高增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。
3)兩階段的聚合(局部聚合和全局聚合):
適用場景:對RDD執行reduceByKey等這類有聚合操作的shuffle算子或者spark SQL使用使用group by語句進行分組聚合,比較適用。
原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同key,就可以讓原本被一個task處理的數據分散到多個task上做局部聚合,進行解決單個task處理數據量過多的問題。接著去除隨機前綴,再次進行全局的聚合,就可以得到最終的結果。
代碼實現:
object _01SparkDataSkewTwoStageOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val sc = SparkUtil.sparkContext("local[2]", "_01SparkDataSkewTwoStageOps")
val list = List(
"hello you hello hello me",
"hello you hello hello shit",
"oh hello she study"
)
val listRDD = sc.parallelize(list)
val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1))
//step 1 找到發生數據傾斜key
val sampleRDD = pairsRDD.sample(false, 0.6)
val cbk= sampleRDD.countByKey()
// cbkRDD.foreach(println)
val sortedInfo = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2)
val dataSkewKey = sortedInfo.head._1
// sortedInfo.foreach(println)
println("發生了數據傾斜的Key:" + dataSkewKey)
//step 2 給對應的key打上N以內的隨機前綴
val prefixPairsRDD = pairsRDD.map{case (word, count) => {
if(word.equals(dataSkewKey)) {
val random = new Random()
val prefix = random.nextInt(2)//0 1
(s"${prefix}_${word}", count)
} else {
(word, count)
}
}}
prefixPairsRDD.foreach(println)
//step 3 局部聚合
val partAggrInfo = prefixPairsRDD.reduceByKey(_+_)
println("===============>局部聚合之後的結果:")
partAggrInfo.foreach(println)
//step 4 全局聚合
//step 4.1 去掉前綴
val unPrefixPairRDD = partAggrInfo.map{case (word, count) => {
if(word.contains("_")) {
(word.substring(word.indexOf("_") + 1), count)
} else {
(word, count)
}
}}
println("================>去掉隨機前綴之後的結果:")
unPrefixPairRDD.foreach(println)
// step 4.2 全局聚合
val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_)
println("===============>全局聚合之後的結果:")
fullAggrInfo.foreach(println)
sc.stop()
}
}
4)將reduce join 轉換為map join(大小表):
適用場景:在對RDD使用join操作,或者是在sparksql 中使用join語句的時候,而且join操作中的一個RDD或者表的數據量比較小,此方法適用
實現原理:有reduce join的過程一定有shuffle,有shuffle就可能出現數據的傾斜,所以將reduce join使用map join 代替。如果一個RDD是比較小的,那麽可以使用廣播變量的方式,將小RDD發送到各個worker的executor中,實現本地的連接
代碼實現:
object _02SparkRDDBroadcastOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(s"${_02SparkRDDBroadcastOps.getClass.getSimpleName}")
val sc = new SparkContext(conf)
val stu = List(
"1 鄭祥楷 1",
"2 王佳豪 1",
"3 劉鷹 2",
"4 宋誌華 3",
"5 劉帆 4",
"6 OLDLi 5"
)
val cls = List(
"1 1807bd-bj",
"2 1807bd-sz",
"3 1807bd-wh",
"4 1807bd-xa",
"7 1805bd-bj"
)
/*
使用廣播變量來完成上述操作
一般用戶表都比較大,而班級表相對很小,符合我們在共享變量中提出的第一個假設
所以我們可以嘗試使用廣播變量來進行解決
*/
val stuRDD = sc.parallelize(stu)
//cls-->map---->
val map = cls.map{case line => {
(line.substring(0, line.indexOf(" ")), line.substring(line.indexOf(" ")).trim)
}}.toMap
//map--->broadcast
val clsMapBC:Broadcast[Map[String, String]] = sc.broadcast(map)
stuRDD.map{case line => {
val map = clsMapBC.value
val fields = line.split("\\s+")
val cid = fields(2)
// map.get(cid)
val className = map.getOrElse(cid, "UnKnown")
s"${fields(0)}\t${fields(1)}\t${className}"//在mr中學習到的map join
}}.foreach(println)
sc.stop()
}
5)采樣傾斜的key並拆分join操作(大大表):
適用場景:在hive兩張表進行join的時候,如果兩張表的數據都很大,並且,一張表的數據很均勻,但是另一張表的數據有少量的key數據量過大,此時使用這個解決方案
實現原理:對於join導致的數據傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,並附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了。
代碼實現:
一張表中:
Id num
1 100W
2 10
3 10
4 10
5 10
可以使用union的方式來啟動多個job並行執行:
//通過分離數據量大key來解決數據傾斜
select count(*) from t_test where id !=1 group by id
union
select count(*) from t_test where id ==1 group by id
6)使用隨機前綴和擴容RDD進行join(大量key的數據傾斜):
適用場景:如果進行join操作時,RDD中有大量的key導致數據傾斜,那麽進行拆分可以也沒有意義,此時使用這種方法
實現原理:這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數據擴容,對內存資源要求很高。
代碼實現:
左表的連接條件的值,可以在某個範圍內進行隨機,並且這個隨機值有多少個,那麽右表的數據就要復制多少份。
object _03SparkJoinDataSkewOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
val sc = SparkUtil.sparkContext("local[2]", "_03SparkJoinDataSkewOps")
val list1 = List(
"hello 1",
"hello 2",
"hello 3",
"hello 4",
"you 1",
"me 2"
)
val list2 = List(
"hello zhangsan dfsadfasdfsa",
"hello lisi adfasdfasd",
"you wangwu adfasdfs",
"me zhouqi adfadfa"
)
//<key, value>
val listRDD1 = sc.parallelize(list1).map(line => {
val fields = line.split("\\s+")
(fields(0), fields(1))
})
//<key, value>
val listRDD2 = sc.parallelize(list2).map(line => {
val fields = line.split("\\s+")
(fields(0), fields(1))
})
val joinRDD: RDD[(String, (String, String))] = dataSkewRDDJoin(sc, listRDD1, listRDD2)
println("最後進行join的結果:")
joinRDD.foreach(println)
sc.stop()
}
private def dataSkewRDDJoin(sc: SparkContext, listRDD1: RDD[(String, String)], listRDD2: RDD[(String, String)]) = {
//假設listRDD1中的部分key有數據傾斜,所以我在進行join操作的時候,需要進行拆分計算
//step 1 找到發生數據傾斜的key
val dataSkewKeys = listRDD1.sample(false, 0.6).countByKey().toList.sortWith((t1, t2) => t1._2 > t2._2).take(1).map(t => t._1)
println("通過sample算子得到的可能發生數據傾斜的key:" + dataSkewKeys)
//step 2 對listRDD1和listRDD2中的數據按照dataSkewKeys各拆分成兩個部分
//step 2.1 講dataSkewKeys進行廣播
val dskBC = sc.broadcast(dataSkewKeys)
// step 2.2 進行拆分
val dataSkewRDD1 = listRDD1.filter { case (word, value) => {
//有數據傾斜的rdd--->dataskewRDD1
val dsks = dskBC.value
dsks.contains(word)
}
}
val commonRDD1 = listRDD1.filter { case (word, value) => {
//沒有數據傾斜的rdd--->commonRDD1
val dsks = dskBC.value
!dsks.contains(word)
}
}
val dataSkewRDD2 = listRDD2.filter { case (word, value) => {
//有數據傾斜的rdd--->dataskewRDD1
val dsks = dskBC.value
dsks.contains(word)
}
}
val commonRDD2 = listRDD2.filter { case (word, value) => {
//沒有數據傾斜的rdd--->commonRDD1
val dsks = dskBC.value
!dsks.contains(word)
}
}
}
//step 3 對dataskewRDD進行添加N以內隨機前綴
// step 3.1 添加隨機前綴
val prefixDSRDD1:RDD[(String, String)] = dataSkewRDD1.map { case (word, value) => {
val random = new Random()
val prefix = random.nextInt(2)
(s"${prefix}_${word}", value)
}
}
// step 3.2 另一個rdd進行擴容
val prefixDSRDD2:RDD[(String, String)] = dataSkewRDD2.flatMap { case (word, value) => {
val ab = ArrayBuffer[(String, String)]()
for (i <- 0 until 2) {
ab.append((s"${i}_${word}", value))
}
ab
}
}
println("---->有數據傾斜RDD1添加前綴成prefixDSRDD1的結果:" + prefixDSRDD1.collect().mkString(","))
println("---->有數據傾斜RDD2擴容之後成prefixDSRDD2的結果:" + prefixDSRDD2.collect().mkString(","))
// step 4 分步進行join操作
// step 4.1 有數據傾斜的prefixDSRDD1和prefixDSRDD2進行join
val prefixJoinDSRDD = prefixDSRDD1.join(prefixDSRDD2)
//ste 4.2 無數據傾斜的commonRDD1和commonRDD2進行join
val commonJoinRDD = commonRDD1.join(commonRDD2)
// step 4.3 將隨機前綴去除
val dsJionRDD = prefixJoinDSRDD.map { case (word, (value1, value2)) => {
(word.substring(2), (value1, value2))
}
}
//step 5 將拆分進行join之後的結果進行union連接,得到最後的結果 sql union all
val joinRDD = dsJionRDD.union(commonJoinRDD)
joinRDD
}
}
本博文參考至美團的spark調優:https://tech.meituan.com/
spark調優之數據傾斜