Spark-sql 1.x版
阿新 • • 發佈:2018-12-27
package Test01 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Dataset, SQLContext, SparkSession} /** * 如果使用DataFrame或SQL讀取資料,先將非結構化資料轉化成結構化資料 * 然後註冊檢視,執行Sql,最好觸發action */ case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int) object SparkSql01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkSql01").setMaster("local[*]") val sc = new SparkContext(conf) //sqlContext是對sparkContext的一個包裝(增強了類功能,可以處理結構化的資料) val sqlContext = new SQLContext(sc) //讀取資料來源 val lines: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(",")) //建立對映關係 val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)) //匯入隱式轉換,將RDD轉換成DataFrame import sqlContext.implicits._ val df1 = allEmp.toDF() df1.show() //釋放資源 sc.stop() } }
第二種寫法
package Test01 import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SQLDemox2 { def main(args: Array[String]): Unit = { //Spark Rdd 程式的執行入口 val conf = new SparkConf().setAppName("SQLDemox2").setMaster("local[*]") val sc = new SparkContext(conf) //SqlContext是對sparkconext的一個包裝(增強了類功能,可以處理結構化資料) val sqlcontext = new SQLContext(sc) //讀取資料來源 val lines: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(",")) //整理資料Row + schema val rowRDD: RDD[Row] = lines.map(line => { Row(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt) }) //建立表結構 val schema = StructType { List( StructField("empno", IntegerType), StructField("ename", StringType), StructField("job", StringType), StructField("mgr", StringType), StructField("hiredate", StringType), StructField("sal", IntegerType), StructField("comm", StringType), StructField("deptno", IntegerType) ) } //RDD關聯Schema val df: DataFrame = sqlcontext.createDataFrame(rowRDD,schema) val result = df.select("empno","ename","sal") result.show() //釋放資源 sc.stop() } }