Spark 建立DataFrame的三種方式
阿新 • • 發佈:2019-01-27
1.從資料庫讀資料建立DF
/**SQLComtext 建立 DataFrame 1**/ def createDataFrame(sqlCtx: SQLContext): Unit = { val prop = new Properties() prop.put("user","root") prop.put("password","abc314") prop.put("driver","com.mysql.jdbc.Driver") val dataDF = sqlCtx.read.jdbc("jdbc:mysql://localhost:3306/test","sy_users",prop) dataDF.show()
2. 通過SQLContext的createDataFrame方法構建DF
/**SQLComtext 建立 DataFrame 2**/ def createDtaFrame(sparkCtx:SparkContext,sqlCtx:SQLContext):Unit = { val rowRDD = sparkCtx.textFile("D://TxtData/studentInfo.txt").map(_.split(",")).map(p => Row(p(0),p(1).toInt,p(2))) val schema = StructType( Seq( StructField("name",StringType,true), StructField("age",IntegerType,true), StructField("studentNo",StringType,true) ) ) val dataDF = sqlCtx.createDataFrame(rowRDD,schema) //df註冊到記憶體表 dataDF.registerTempTable("Student") val result = sqlCtx.sql("select * from Student") result.show() // dataDF.select("name").show() // dataDF.filter(dataDF("age") <14).show() // dataDF.where("age <> ''").show() }
3.通過隱式轉換構建DF
/** SQLComtext 建立 DataFrame 3 **/ case class Person(str: String, i: Int, str1: String) def createDF(sparkCtx:SparkContext,sqlCtx:SQLContext):Unit = { import sqlCtx.implicits._ val dataDF = sparkCtx.textFile("D://TxtData/studentInfo.txt") .map(_.split(",")).map(p => Person(p(0),p(1).toInt,p(2))).toDF() dataDF.show() }
推薦第三種方式,簡潔明瞭!
4.DF入Hive庫例項
result.where("userID <> '-' and newsID <> ''").registerTempTable("temp_newsTable")
hiveCtx.sql("INSERT OVERWRITE TABLE mmbigdata.ods_mm_news " +
"partition (year='"+year+"',month='"+month+"',day='"+day+"',type='view') select * from temp_newsTable")