SparkSQL學習記錄(SparkSQL 兩種Schema建立方式)
阿新 • • 發佈:2019-02-02
方式://l通過定義Case Class,使用反射推斷Schema(case class方式)
方式一:
//2 通過可程式設計介面,定義Schema,並應用到RDD上(createDataFrame 方式)
依賴:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency>
方式一:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext //l通過定義Case Class,使用反射推斷Schema(case class方式) case class Person(name: String, age: Int) object SparkSqlDemo1 { def main(args: Array[String]) { val conf = new SparkConf().setAppName("sparksqldemo1").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rddpeople = sc.textFile("test.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim().toInt)) //隱式轉換 // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val df = rddpeople.toDF() df.registerTempTable("people") //快取和清除快取表 //sqlContext.cacheTable("people") //sqlContext.uncacheTable("people") //sqlContext.sql("CACHE TABLE people") //sqlContext.sql("UNCACHE TABLE people") val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19") //DSL(Domain Specific Language) //在DSL中,使用Scala符號'+標示符表示基礎表中的列,Spark的execution engine會將這些標示符隱式轉換成表示式 //另外可以在API中找到很多DSL相關的方法,如where()、select()、limit()等等,詳細資料可以檢視Catalyst模組中的DSL子模組 // val teenagers = df.where('age >= 10).select('name) teenagers.map(t => "Name: " + t(0)).collect().foreach(println) sc.stop() } }
方式二:
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row //通過可程式設計介面,定義Schema,並應用到RDD上(createDataFrame 方式) object SparkSqlDemo2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("sparksqldemo2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val schemaString = "name age" val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD = sc.textFile("test.txt").map(_.split(" ")).map(p => Row(p(0), p(1).trim)) val peopleDF = sqlContext.createDataFrame(rowRDD, schema) peopleDF.registerTempTable("people") sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") .map(t => "Name: " + t(0)).collect().foreach(println) } }