1. 程式人生 > >Spark 建立DataFrame的三種方式

Spark 建立DataFrame的三種方式

1.從資料庫讀資料建立DF

  /**SQLComtext 建立 DataFrame 1**/
  def createDataFrame(sqlCtx: SQLContext): Unit = {
    val prop = new Properties()
    prop.put("user","root")
    prop.put("password","abc314")
    prop.put("driver","com.mysql.jdbc.Driver")
    val dataDF = sqlCtx.read.jdbc("jdbc:mysql://localhost:3306/test","sy_users",prop)
    dataDF.show()

2. 通過SQLContext的createDataFrame方法構建DF

/**SQLComtext 建立 DataFrame 2**/
  def createDtaFrame(sparkCtx:SparkContext,sqlCtx:SQLContext):Unit = {
    val rowRDD = sparkCtx.textFile("D://TxtData/studentInfo.txt").map(_.split(",")).map(p => Row(p(0),p(1).toInt,p(2)))
    val schema = StructType(
      Seq(
        StructField("name",StringType,true),
        StructField("age",IntegerType,true),
        StructField("studentNo",StringType,true)
      )
    )
    val dataDF = sqlCtx.createDataFrame(rowRDD,schema)

    //df註冊到記憶體表
    dataDF.registerTempTable("Student")
    val result = sqlCtx.sql("select * from Student")
    result.show()

    //    dataDF.select("name").show()
    //    dataDF.filter(dataDF("age") <14).show()
    //    dataDF.where("age <> ''").show()
  }

3.通過隱式轉換構建DF

 /** SQLComtext 建立 DataFrame 3 **/
  case class Person(str: String, i: Int, str1: String)
  def createDF(sparkCtx:SparkContext,sqlCtx:SQLContext):Unit = {
    import sqlCtx.implicits._
    val dataDF = sparkCtx.textFile("D://TxtData/studentInfo.txt")
      .map(_.split(",")).map(p => Person(p(0),p(1).toInt,p(2))).toDF()
    dataDF.show()
  }

推薦第三種方式,簡潔明瞭!

4.DF入Hive庫例項

 result.where("userID <> '-' and newsID <> ''").registerTempTable("temp_newsTable")
    hiveCtx.sql("INSERT OVERWRITE TABLE mmbigdata.ods_mm_news " +
      "partition (year='"+year+"',month='"+month+"',day='"+day+"',type='view') select * from temp_newsTable")