hadoop常用演算法在spark中實現
阿新 • • 發佈:2019-02-03
object MRInSpark { /** * 求最大值最小值一直是Hadoop的經典案例,我們用Spark來實現一下, * 藉此感受一下spark中mr的思想和實現方式 */ def maxMin = { val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]") val sc = new SparkContext(sconf) val foo = sc.parallelize(List(1, 6, 4, 22)) val max = foo.reduce((a, b) => Math.max(a, b)) val min = foo.reduce((a, b) => Math.min(a, b)) print(s"max=$max, min=$min") } /** * 平均值問題 * 求每個key對應的平均值是常見的案例, * 在spark中處理類似問題常常會用到combineByKey這個函式, * 詳細介紹請google一下用法,下面看程式碼: * */ def avg = { val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]") val sc = new SparkContext(sconf) val foo = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2), ("b", 8))) val result = foo.combineByKey( //按照key進行分割槽內合併,v表示value,1表示當前的key出現的次數 (v) => (v, 1), //acc為之前建立的元組,如果出現同一個key的value要進行累加 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //將不同分割槽間的的資料進行合併 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ).map { case (k, v) => (k, v._1 / v._2.toDouble) } result.collect().foreach(println) } def avgTwo = { val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]") val sc = new SparkContext(sconf) val foo = sc.parallelize(List(("a", 1), ("a", 3), ("b", 2), ("b", 8))) val result = foo.groupByKey().map { case (k, vs) => (k, vs.toList.sum / vs.size) } result.collect().foreach(println) } /** * Top n問題同樣也是hadoop種體現mr思想的經典案例,那麼在spark中如何方便快捷的解決呢: */ def topn = { val sconf = new SparkConf().setAppName("avgTest").setMaster("local[2]") val sc = new SparkContext(sconf) val foo = sc.parallelize(List(("a", 1), ("a", 3), ("a", 2), ("b", 1), ("b", 4), ("a", 4), ("b", 2))) val groupSort = foo.groupByKey().map { case (k, values) => //n此時取值為2 val sortValues = values.toList.sortWith(_ > _).take(2) (k, sortValues) } groupSort.flatMap { case (k, vs) => vs.map(k -> _) }.foreach(println) } def main(args: Array[String]): Unit = { avgMine } }