1. 程式人生 > >培訓系列10--spark rdd groupbykey的使用

培訓系列10--spark rdd groupbykey的使用

//groupbykey

一、準備資料
val flights=sc.textFile("data/Flights/flights.csv")
val sampleFlights=sc.parallelize(flights.take(1000))
val header=sampleFlights.first
val filteredFlights=sampleFlights.filter(line=>{
line!=header&&line.split(",")(22)!=""
})

這裡的準備資料使用的相對路勁

二、使用map函式獲得自己想要計算的幾個欄位
val airLinesMap=filteredFlights.map(line=>{
val tailNum=line.split(",")(6)
val airline=line.split(",")(4)
(airline,tailNum)
})

三、使用groupbykey操作,合併行
val airlinesGroup= airLinesMap.distinct.groupByKey()
airlinesGroup.take(20).foreach(println)

四、計算每個航空公司的航班,當然也可以不用groupbykey直接使用reducebykey實現

//計算每個航空公司的航班
val airplanesCount =airlinesGroup.map(line=>{
(line._1,line._2.size)
})
airplanesCount.take(20).foreach(println)

五。計算飛機延誤的機率

//計算延誤的機率
val flightsMap=filteredFlights.map(flight=>{
var airline= flight.split(",")(4)
var delay = flight.split(",")(22)
(airline,delay)
})

以上獲得需要計算的相關欄位。

val flightDelays=flightsMap.groupByKey()
val delayChance= flightDelays.map(airline=>{
var count=0
var totalCount =airline._2.size
for (delay<-airline._2){
if(delay.toInt>0){
count+=1
}
}
(airline._1,(count+0.0)/totalCount)
})
delayChance.take(20).foreach(println)

這裡使用了scala的for迴圈,直接把value裡面的list值輸入到一個變數delay裡面去。