SparkSQL入門案例之四(SparkSQL2.x)
阿新 • • 發佈:2018-11-10
前幾個案例講的都是都是SparkSQL1.x的程式設計,所以這裡就講SparkSQL2.x的程式設計
直接上程式碼,這裡的程式碼是在前邊案例的基礎上的:
package cn.ysjh0014.SparkSql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object SparkSqlTest1 { def main(args: Array[String]): Unit = { //SparkSQL2.x的程式設計API(SparkSession) //SparkSession是SparkSQL2.x的入口 val session: SparkSession = SparkSession.builder().appName("SqlTest1").master("local[4]").getOrCreate() //getOrCreate()是建立SparkSession的 //建立RDD val lines: RDD[String] = session.sparkContext.textFile(args(0)) //整理資料 val RowRdd: RDD[Row] = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) val age = fields(2).toInt val yz = fields(3).toDouble Row(id, name, age, yz) }) //結果型別,其實就是表頭,用於描述DataFrame val sm: StructType = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("yz", DoubleType, true) )) //將RowRDD關聯到Schema val df: DataFrame = session.createDataFrame(RowRdd,sm) import session.implicits._ val df1: Dataset[Row] = df.where($"yz">98).orderBy($"age" desc) df1.show() session.stop() } }
可以清楚的看出,SparkSQL2.x是建立SparkSession,而1.x是建立SparkContext