1. 程式人生 > >Spark實踐之join優化

Spark實踐之join優化

join優化應該是spark相關崗位面試必考的內容。 join其實常見的就分為兩類: map-side join 和  reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。。

/**
 * Created by shenjiyi on 2015/7/8.
 */

package com.test

import com.test.utils.MySparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestJoin {
  def main (args: Array[String]): Unit ={
    val conf = new SparkConf()
      .setMaster(args(0))
      .setAppName("TestJoin")
      .set("spark.speculation", "true")
      .set("spark.default.parallelism", "200")
    val sc = new MySparkContext(conf)

    val input1 = sc.rawTextFile(args(1), "GB18030")
    val input2 = sc.rawTextFile(args(2), "GB18030")
    val output1 = args(3)
    val output2 = args(4)

    val pairs = input1.map { x =>
      val pos = x.indexOf(',')
      (x.substring(0, pos), x.substring(pos + 1))
    }.collectAsMap()


    //map-side join 適用於小表和大表join的情況
    //將小表load到記憶體,然後broad到各個節點之後,再個大表做join,可以避免shuffle,提高效率
    val broadCastMap = sc.broadcast(pairs)
    val result = input2.map{ x =>
      val pos = x.indexOf('\t')
      (x.substring(0, pos), x.substring(pos + 1))
    }.mapPartitions { iter =>
      val m = broadCastMap.value
      for {
        (k, v) <- iter
        if (m.contains(k))
      } yield (k, (v, m.get(k).getOrElse("")))
    }.saveAsTextFile(output1)


    //reduce-side join
    val pairs2 = input1.map { x =>
      val pos = x.indexOf('\t')
      (x.substring(0, pos), x.substring(pos + 1))
    }
    val result2 = input2.map { x =>
      val pos = x.indexOf('\t')
      (x.substring(0, pos), x.substring(pos + 1))
    }.join(pairs2).saveAsTextFile(output2)
  }
}