SparkSql入門案例之三(Spark1.x)
阿新 • • 發佈:2018-11-10
案例一和案例二中是將RDD轉換成DataFrame的方法不同,但是在轉換後都是使用SQL的方式來程式設計的,這裡就用DataFrame API(DSL 特定領域程式語言)的方式來實現
直接上程式碼:
package cn.ysjh0014.SparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SparkSqlDemo3 { def main(args: Array[String]): Unit = { //這個程式可以提交到Spark叢集中 val conf = new SparkConf().setAppName("SparkSql3").setMaster("local[4]") //這裡的setMaster是為了在本地執行,多執行緒執行 //建立Spark Sql的連線 val sc = new SparkContext(conf) //SparkContext不能建立特殊的RDD,將Spark Sql包裝進而增強 val SqlContext = new SQLContext(sc) //建立DataFrame(特殊的RDD,就是有schema的RDD),先建立一個普通的RDD,然後再關聯上schema val lines = sc.textFile(args(0)) //將資料進行處理 val RowRdd: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val yz = fields(3).toDouble Row(id,name,age,yz) }) //結果型別,其實就是表頭,用於描述DataFrame val sm: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("yz", DoubleType, true) )) //將RowRDD關聯schema val df: DataFrame = SqlContext.createDataFrame(RowRdd,sm) //使用DataFrame API的方式 //不使用SQL的方式就不用註冊臨時表了 val df1: DataFrame = df.select("id","name","age","yz") //匯入隱式轉換 import SqlContext.implicits._ val df2: Dataset[Row] = df.orderBy($"age" desc,$"yz" asc) //這裡的desc和asc是方法所以必須小寫 df1.show() df2.show() //釋放資源 sc.stop() } }
執行結果: