1. 程式人生 > >RDD運算元

RDD運算元

transform運算元

	//定義一個內建陣列
	val arr = Array(1,2,3,4,5)
	//將陣列轉化為rdd
	val rdd1 = sparkContext.parallelize(arr)
	
	1.map
	rdd1.map(x=>x*2)
	rdd1.map(x=>(x,x))
	
	2.filter
	//保留奇數
	rdd1.filter(x=>if(x%2 == 1) true else false)
	
	3.flatmap
	val arr2 = Array("hello a","hello b","hello c")
	rdd2.flatmap(line=>line.split(" "))

	4.mapPartitions
	val rdd1 = sparkContext.parallelize(arr , 3 ) //分割槽數設定為3,預設是2
	rdd1.mapPartitions(x:Iterator[Int]=>{
		//每個分割槽執行一次這些程式碼		
		val newlist:List[Int] = x.toList.map(y=>y*y)
		newlist.toIterator
	})

	5.mapPartitionsWithIndex
	rdd3.mapPartitionsWithIndex((index:Int,data:Iterator[Int])=>{
		println("執行操作的分割槽編號是"+index)
		val newlist:List[Int] = data.toList.map(y=>y*y)
		newlist.toIterator
	})
	6.smaple
		//是否放回,取樣比例,種子數
		rdd1.sample(true,0.1,0)
	
	7.groupbykey//根據key整合
		reducebykey//根據key,把value進行計算,相同的key進行求和
		rdd1.reducebykey((x:Int,y:Int)=> x+y) //就是把value依次相加得到和
		區別參考:https://blog.csdn.net/do_yourself_go_on/article/details/76033252

	8.sortbykey
		排序
		//true升序,false降序
		rdd.sortbykey(true/false)
		sortby
	
	9.aggregatebykey
		groupbykey+aggregate
		aggregate 針對單個元素的rdd
		aggregatebykey 針對key-value形式
		引數
		aggregatebykey(1)(2,3)
		分割槽執行的,執行之後每個區進行合併
		1)初始值
		2)迭代操作,拿rdd中的每個元素和初始值進行合併
		3)分割槽合併邏輯
		
		aggregate求平均值
		//第一個引數可以是元祖,第一個是sum,第二個是個數
		//第二個引數,後面的int合併到前面的元祖中(int,int)
		//第三個引數,相當於不同分割槽的元祖進行合併
		val res = aggregate(0,0)((u:(Int,Int),x:Int)=>(u._1+x,u._2+1),
		(x,y)=>((x._1+y._1),(x._2+y._2)))
		val avg  =  res._1.toDouble/res._2