Spark常見程式設計問題解決辦法及優化
目錄
- 1.資料傾斜
- 2.TopN
- 3.Join優化
- 4.根據HashMap、DF等資料集進行filter
- 5.Join去掉重複的列
- 6.展開NestedDF
- 7.計算session/組內時間差
- 8.用flatMap替代map + filter
- 9.分層抽樣
- 10.SQL與DF API
- 11.Shuffle後的分割槽
- 12.多維分析的優化
1.資料傾斜
來源:讀取資料之後,包括從資料來源讀取和shuffle後讀取
後果:大部分task和小部分task完成時間相差很大、OOM(也有可能時異常資料的問題,需要完善程式碼)。
分析:用sample + countBykey -> /count判斷key的分佈情況。
解決方法:
- 採用map-side聚合的運算元
- 提高並行度repartition
- 先估計分佈,確定哪些key導致傾斜,如果單個key資料不是太大,可以自定義partition為其分割槽;如果單個key資料很大,就多key進行改造。
- join類傾斜:
- 過濾掉業務無關null再join
- 其中一個表小時,廣播join
- 傾斜資料分離:分離出傾斜部分的表,這個表通常不大,此時再廣播join
- 如果單個key過大,那隻能對該key進行改造了,即為key新增一個隨機後序,如0、1、2中的一個,而另一個表則要擴大3倍,每條資料的key分別加上0、1、2的字尾(為保證所有key都被分配0字尾,從而另一個表沒有足夠的資料join)。這裡可以自定義一些UDF來實現對資料分佈的估計和改造key中n,即打散程度,的選擇。
- 資料來源:儘量用可分割檔案儲存資料、repartition
2.TopN
TopN問題可分為4種
- 總體/組間TopN
- DF轉pairRDD後用takeOrdered()。優點:不需要全排;缺點:結果為聚合到Driver的Array,所以不適合N較大的情況。
- DF -> sort -> rdd -> zipWithIndex -> filter(index < n) 。優點:適合N較大的情況,結果仍然是分散式的;缺點:全排,N較小時比上面慢
- DF的sort後take。優點:簡單;缺點:全排,N較大時很慢,甚至會OOM(take會將結果都shuffle到一個partition中)
- 未聚合總體TopN:即資料需要分組聚合後才能比較
- DF.groupBy().agg()然後接上面的“總體/組間TopN”
- 分組/組內TopN
- Aggragator。優點:快;缺點:較複雜
- window function。優點:簡單,適合N較大;缺點:稍慢(window function並不進行map-side聚合,所以shuffle量較大)
// Aggragator例子
class TopNAggregator[K1: TypeTag, K2: TypeTag, V: TypeTag](num: Int, ord: Ordering[(K2, V)])
extends Aggregator[(K1, K2, V), mutable.PriorityQueue[(K2, V)], Array[(K2, V)]] {
override def zero: mutable.PriorityQueue[(K2, V)] = new mutable.PriorityQueue[(K2, V)]()(ord)
override def reduce(q: mutable.PriorityQueue[(K2, V)],
a: (K1, K2, V)): mutable.PriorityQueue[(K2, V)] = {
if (q.size < num) {
q += ((a._2, a._3))
} else {
q += ord.min((a._2, a._3), q.dequeue)
}
}
override def merge(q1: mutable.PriorityQueue[(K2, V)],
q2: mutable.PriorityQueue[(K2, V)]): mutable.PriorityQueue[(K2, V)] = {
q1 ++= q2
while (q1.length > num) {
q1.dequeue()
}
q1
}
override def finish(r: mutable.PriorityQueue[(K2, V)]): Array[(K2, V)] = {
r.toArray.sorted(ord.reverse)
}
override def bufferEncoder: Encoder[mutable.PriorityQueue[(K2, V)]] = {
Encoders.kryo[mutable.PriorityQueue[(K2, V)]]
}
override def outputEncoder: Encoder[Array[(K2, V)]] = ExpressionEncoder[Array[(K2, V)]]()
}
// 使用
val topNAggregator = new TopNAggregator[Int, Int, Float](10, Ordering.by(-_._2))
df.groupByKey()
.agg(topNAggregator.toColumn)
3.Join優化
預排序的join
針對SortMergeJoinExec,在mapper端提前sort。原始碼在Reducer端進行排序,但reducer端的資料不及mapper端均勻,所以排序工作量不一,會導致尾部延遲放大。Map階段會按照key的雜湊值對資料進行重分割槽並按key排序。Reducer只需對來自不同Mapper的資料進行歸併排序。這種機制相當於把Reducer排序的任務分流給Mapper。而由於Mapper的資料量往往是比較均勻的,所以排序的效能會優於Reducer。
待考證:如果直接處理RDD,對兩個需要join的RDD呼叫 repartitionAndSortWithinPartitions 然後join
cross join
當每條資料都需要和其餘的每條資料進行計算時,例如計算相似度矩陣,下面的方法進行crossjoin能夠大大減小其中間結果。實驗時直接crossjoin能產生3G以上的資料,應用此方法則只有幾十M。
val ready2Crossjoin = movieFeatures.as[(Int, Array[Float])]
.mapPartitions(_.grouped(4096))
implicit val ordering = new Ordering[(Int, Float)] {
def compare(x: (Int, Float), y: (Int, Float)): Int = {
val compare2 = x._2.compareTo(y._2)
if (compare2 != 0) return -compare2
0
}
}
val ratings = ready2Crossjoin.crossJoin(ready2Crossjoin)
.as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
.flatMap {
case (mf1Iter, mf2Iter) =>
val m1 = mf1Iter.size
val m2 = math.min(mf2Iter.size, 100)
var i = 0
val output = new Array[(Int, Int, Float)](m1 * m2)
val pq = mutable.PriorityQueue[(Int, Float)]()
val vectorOp = new F2jBLAS
mf1Iter.foreach { case (m1Id, mf1Factor) =>
mf2Iter.foreach { case (m2Id, mf2Factor) =>
if (m1Id == m2Id) {
// do nothing
} else {
val simScore = consinSim(ALSRank, vectorOp, mf1Factor, mf2Factor)
if (pq.length < m2) {
pq.enqueue((m2Id, simScore))
} else {
val temp = pq.dequeue()
pq += (if (temp._2 > simScore) temp else (m2Id, simScore))
}
}
}
pq.foreach { case (mf2Id, score) =>
output(i) = (m1Id, mf2Id, score)
i += 1
}
pq.clear()
}
output.toSeq
}
private def consinSim(rank: Int, operator: F2jBLAS, movie1: Array[Float], movie2: Array[Float]): Float = {
operator.sdot(rank, movie1,1, movie2, 1) / operator.snrm2(rank, movie1,1) * operator.snrm2(rank, movie2,1)
}
考慮Join順序
Spark SQL的CBO尚未成熟,不能對SQL中的join的順序做智慧調整。順序的確定需要對資料表的分佈有所瞭解,從而推斷某些順序能夠產生更少的中間資料,進而提高效率。
4.根據HashMap、DF等資料集進行filter
在HashMap、DF等資料集較小的情況下:
HashMap:廣播map,然後根據contain來filter。適合資料集較小的情況。
DF:提取相應的列後,然後用left_anti。適合比上面資料集稍大的情況。
當資料集很大時,同樣利用上面DF的方法,但去掉broadcast,然Spark自行決定如何join。
// HashMap filter
val BCMap = sc.broadcast(mapForFilter)
val filteredDF = df.filter($"col_name" isin (BCMap.value: _*))
// DF filter
val DFForFilter = df1.select("id")
val filteredDF = df0.join(broadcast(filteredDF), Seq("id"), "left_anti"))
5.Join去掉重複的列
val df = left.join(right, Seq("name"))
6.展開NestedDF
+---+-----------+
| _1| _2|
+---+-----------+
| 1|[2, [3, 4]]|
+---+-----------+
+---+-----+--------+--------+
| _1|_2._1|_2._2._1|_2._2._2|
+---+-----+--------+--------+
| 1| 2| 3| 4|
+---+-----+--------+--------+
implicit class DataFrameFlattener(df: DataFrame) {
def flattenSchema: DataFrame = {
df.select(flatten(Nil, df.schema): _*)
}
protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
case s: StructType => s.fields.flatMap(f => flatten(path :+ f.name, f.dataType))
case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
}
}
veryNestedDF.flattenSchema.show()
7.計算session/組內時間差
val timeFmt = "yyyy-MM-dd HH:mm:ss"
val sessionid2ActionsRDD2 = UserVisitActionDF
.withColumn("action_time", unix_timestamp($"action_time", timeFmt))
.groupBy("session_id")
.agg(min("action_time") as "start",
max("action_time") as "end",
.withColumn("visitLength", $"start" - $"end")
8.用flatMap替代map + filter
df.flatMap(if (filter_condition) Some(result) else None)
9.分層抽樣
// 各種型別抽取10%
val fractions = HashMap(
TYPE1 -> 0.1,
TYPE2 -> 0.1,
TYPE3 -> 0.1
)
val randomSeed = 2L
df.stat.sampleBy("col_name", fractions, randomSeed)
// 如果col_name的資料種類未知,用下面方式得出fractions
df.select("time_period")
.distinct
.map(x=> (x, 0.1))
.collectAsMap
10.SQL與DF API
SQL作為宣告式語言,即只需要指定所需資料的模式就能得到結果。這種語言的程式設計思路容易讓人忽略程式碼的執行順序,從而寫出一些執行效率低的程式碼。儘管Spark有Optimizer優化,但尚未完全成熟,部分SQL語句無法實現filter、aggregation等下推。
DF API是一種函式式的語言,能讓程式設計者注意到執行順序,減小寫出低效程式碼的可能。
11.Shuffle後的分割槽
使用DF時,開啟自動分割槽。
如果適用RDD,則有些shuffle是可以輸入partitioner引數的,這就可以控制shuffle後的分割槽數,一些情況還能避免shuffle。如下面程式碼,rdd2執行reduceByKey的shuffle時使用rdd1的partitioner,那麼之後的rdd3和rdd1的join就不需要shuffle了。
val rdd1Partitioner = rdd1.partitioner match {
case Some(p) => p
case None => new HashPartitioner(rdd1.partitions.length)
}
val rdd3 = rdd2.reduceByKey(rdd1Partitioner, (x, y) => if (x > y) x else y)
rdd3.join(rdd1)
12.多維分析的優化
多維分析,如rollup、cube等的運算元,在Spark內建的是Expand方式,根據選用的運算元一次性開闢足夠的記憶體。如果實現Union方式的二次開發,即讀取一次計算一個維度的結果,然後不斷union這些結果,能在某些情況提升效率。
總體來說,Expand方式適合維度小的多維分析,Union方式適合維度大的多維分析。這是因為Expand方式讀取資料的次數只有一次,但資料會膨脹2n倍,而Union方式會讀取資料2n次。