1. 程式人生 > >spark建立DF的兩種方式

spark建立DF的兩種方式

方式一:反射:(使用這種方式來建立DF是在你知道欄位具體有哪些)
1.建立一個SparkContext,然後再建立SQLContext
2.先建立RDD,對資料進行整理,然後關聯case class,將非結構化的資料轉換成結構化資料
3.顯示的呼叫toDF方法,將RDD轉換成DF(需要隱私轉換)
4.註冊臨時表
5.執行SQL(Transformation,lazy)
6.zhixAction

val conf = new SparkConf().setAppName("df").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //建立一個普通RDD
    val rdd = sc.textFile("G:\\qf大資料\\spark\\day06_sql\\students.txt")
    val student: RDD[Student] = rdd.map(x => {
      val sp = x.split(" ")
      Student(sp(0).toInt, sp(1), sp(2).toInt)
    })
    import sqlContext.implicits._
    val df: DataFrame = student.toDF()
    df.registerTempTable("student")
    val df1 = sqlContext.sql("select * from student where age<18")
    df1.show()
    //將資料使用json的格式儲存,並且這裡使用的追加的操作。
    df1.write.mode(SaveMode.Append).json("")

方式二:介面方式:(不知道資料的具體欄位有哪些,一般開發中都會使用這種方式)
1.建立sparkContext,然後建立SQLContext
2.想建立RDD對資料進行整理,然後關聯Row,將非結構化資料轉換成結構化資料
3.定義schema(StructType(Array(StructField())
4.註冊臨時表
5.執行SQL
6.執行Action

 val conf = new SparkConf().setAppName("df02").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd = sc.textFile("G:\\qf大資料\\spark\\day06_sql\\students.txt")
    rdd.map(x=>{
      val str = x.split(",")
      Row(str(0).toInt,str(1),str(2).toInt)
    })
    val structType = StructType(Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))
    val df = sqlContext.createDataFrame(structType)
    df.registerTempTable("student")
    sqlContext.sql("select * from student")