13.堆排序
入門與使用參考這一片文件即可:
https://www.cnblogs.com/takemybreathaway/articles/10172339.html
方法(sql使我們定義的sql = new SQLContext(sc)) df是一個DataFrame物件 | 例項說明 |
sql.read.table(tableName) | 讀取一張表的資料 |
df.where(), df.filter() |
過濾條件,相當於sql的where部分; 用法:選擇出年齡欄位中年齡大於20的欄位。 返回值型別:DataFrame df.where("age >= 20"),df.filter("age >= 20") |
df.limit() |
限制輸出的行數,對應於sql的limit 用法:限制輸出一百行 返回值型別:DataFrame df.limit(100) |
df.join() |
連結操作,相當於sql的join 對於join操作,下面會單獨進行介紹 |
df.groupBy() |
聚合操作,相當於sql的groupBy 用法:對於某幾行進行聚合 返回值型別:DataFrame df.groupBy("id") |
df.agg() | 求聚合用的相關函式,下面會詳細介紹 |
df.intersect(other:DataFrame) | 求兩個DataFrame的交集 |
df.except(other:DataFrame) | 求在df中而不在other中的行 |
df.withColumn(colName:String,col:Column) |
增加一列 |
df.withColumnRenamed(exName,newName) | 對某一列的名字進行重新命名 |
df.map(), df.flatMap, df.mapPartitions(), df.foreach() df.foreachPartition() df.collect() df.collectAsList() df.repartition() df.distinct() df.count() |
這些方法都是spark的RDD的基本操作,其中在DataFrame類中也封裝了這些方法,需要注意的是這些方法的返回值是RDD型別的,不是DataFrame型別的,在這些方法的使用上,一定要記清楚返回值型別,不然就容易出現錯誤 |
df.select() |
選取某幾列元素,這個方法相當於sql的select的功能 用法:返回選擇的某幾列資料 返回值型別:DataFrame df.select("id","name") |
以上是兩個都是一寫基本的方法,下面就詳細介紹一下join和agg,na,udf操作
sparkSQL的agg操作
其中sparkSQL的agg是sparkSQL聚合操作的一種表示式,當我們呼叫agg時,其一般情況下都是和groupBy()的一起使用的,選擇操作的資料表為:
1 2 3 4 |
val pSalar = new SQLContext(sc).read.json( "salary.txt" )
val group = pSalar.groupBy( "name" ).agg( "salary" -> "avg" )
val group 2 = pSalar.groupBy( "id" , "name" ).agg( "salary" -> "avg" )
val group 3 = pSalar.groupBy( "name" ).agg(Map( "id" -> "avg" , "salary" -> "max" ))
|
得到的結過如下:
group的結果 group2 group3
使用agg時需要注意的是,同一個欄位不能進行兩次操作比如:agg(Map("salary" -> "avg","salary" -> "max"),他只會計算max的操作,原因很簡單,agg接入的引數是Map型別的key-value對,當key相同時,會覆蓋掉之前的value。同時還可以直接使用agg,這樣是對所有的行而言的。聚合所用的計算引數有:avg,max,min,sum,count,而不是隻有例子中用到的avg