RDD運算元
阿新 • • 發佈:2018-12-17
transform運算元
//定義一個內建陣列 val arr = Array(1,2,3,4,5) //將陣列轉化為rdd val rdd1 = sparkContext.parallelize(arr) 1.map rdd1.map(x=>x*2) rdd1.map(x=>(x,x)) 2.filter //保留奇數 rdd1.filter(x=>if(x%2 == 1) true else false) 3.flatmap val arr2 = Array("hello a","hello b","hello c") rdd2.flatmap(line=>line.split(" ")) 4.mapPartitions val rdd1 = sparkContext.parallelize(arr , 3 ) //分割槽數設定為3,預設是2 rdd1.mapPartitions(x:Iterator[Int]=>{ //每個分割槽執行一次這些程式碼 val newlist:List[Int] = x.toList.map(y=>y*y) newlist.toIterator }) 5.mapPartitionsWithIndex rdd3.mapPartitionsWithIndex((index:Int,data:Iterator[Int])=>{ println("執行操作的分割槽編號是"+index) val newlist:List[Int] = data.toList.map(y=>y*y) newlist.toIterator }) 6.smaple //是否放回,取樣比例,種子數 rdd1.sample(true,0.1,0) 7.groupbykey//根據key整合 reducebykey//根據key,把value進行計算,相同的key進行求和 rdd1.reducebykey((x:Int,y:Int)=> x+y) //就是把value依次相加得到和 區別參考:https://blog.csdn.net/do_yourself_go_on/article/details/76033252 8.sortbykey 排序 //true升序,false降序 rdd.sortbykey(true/false) sortby 9.aggregatebykey groupbykey+aggregate aggregate 針對單個元素的rdd aggregatebykey 針對key-value形式 引數 aggregatebykey(1)(2,3) 分割槽執行的,執行之後每個區進行合併 1)初始值 2)迭代操作,拿rdd中的每個元素和初始值進行合併 3)分割槽合併邏輯 aggregate求平均值 //第一個引數可以是元祖,第一個是sum,第二個是個數 //第二個引數,後面的int合併到前面的元祖中(int,int) //第三個引數,相當於不同分割槽的元祖進行合併 val res = aggregate(0,0)((u:(Int,Int),x:Int)=>(u._1+x,u._2+1), (x,y)=>((x._1+y._1),(x._2+y._2))) val avg = res._1.toDouble/res._2