Spark實踐之join優化
阿新 • • 發佈:2019-01-01
join優化應該是spark相關崗位面試必考的內容。 join其實常見的就分為兩類: map-side join 和 reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。。
/** * Created by shenjiyi on 2015/7/8. */ package com.test import com.test.utils.MySparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object TestJoin { def main (args: Array[String]): Unit ={ val conf = new SparkConf() .setMaster(args(0)) .setAppName("TestJoin") .set("spark.speculation", "true") .set("spark.default.parallelism", "200") val sc = new MySparkContext(conf) val input1 = sc.rawTextFile(args(1), "GB18030") val input2 = sc.rawTextFile(args(2), "GB18030") val output1 = args(3) val output2 = args(4) val pairs = input1.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.collectAsMap() //map-side join 適用於小表和大表join的情況 //將小表load到記憶體,然後broad到各個節點之後,再個大表做join,可以避免shuffle,提高效率 val broadCastMap = sc.broadcast(pairs) val result = input2.map{ x => val pos = x.indexOf('\t') (x.substring(0, pos), x.substring(pos + 1)) }.mapPartitions { iter => val m = broadCastMap.value for { (k, v) <- iter if (m.contains(k)) } yield (k, (v, m.get(k).getOrElse(""))) }.saveAsTextFile(output1) //reduce-side join val pairs2 = input1.map { x => val pos = x.indexOf('\t') (x.substring(0, pos), x.substring(pos + 1)) } val result2 = input2.map { x => val pos = x.indexOf('\t') (x.substring(0, pos), x.substring(pos + 1)) }.join(pairs2).saveAsTextFile(output2) } }