1. 程式人生 > 實用技巧 >spark dataframe的建立(資料流讀取)

spark dataframe的建立(資料流讀取)

1、來自外部檔案json

val data=spark.read.json("hdfs://cslcdip/home/dip/lzm/sparkdata/people.json")
    println(data.schema)
    data.show()

2、來自json格式的RDD

val nameRDD = spark.sparkContext.makeRDD(Array(
      "{\"name\":\"zhangsan\",\"age\":18}",
      "{\"name\":\"lisi\",\"age\":19}",
      "{\"name\":\"wangwu\",\"age\":20}
" )) val nameDF=spark.read.json(nameRDD) println(nameDF.schema) nameDF.show()

3、來自parquet檔案

 spark.sparkContext.setLogLevel("error")
    val data=spark.read.parquet("hdfs://cslcdip/home/dip/lzm/sparkdata/users.parquet")
    println(data.schema)
    data.show()

4、 from mysql pom配置jdbc

  spark.sparkContext.setLogLevel("error")
  val data=spark.read.format("jdbc").option("url","jdbc:mysql://172.16.10.20:3306/hue")
      .option("driver","com.mysql.jdbc.Driver")
      .option("user","hue")
      .option("password","hue")
      .option("dbtable", "auth_user").load()
    data.printSchema()
    data.show()

5、from hive pom配置spark-hive (預設是provide要註釋),制定連線賬戶名

package com.cslc


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession


object Day01 {
  def main(args: Array[String]): Unit = {
    val sparkBuilder=SparkSession.builder
    val conf =new Configuration()
    val c=new Path("F:\\IdeaWorkspace\\lzm\\Resource\\core-site.xml")
    val hd=new Path("F:\\IdeaWorkspace\\lzm\\Resource\\hdfs-site.xml")
    val hi=new Path("F:\\IdeaWorkspace\\lzm\\Resource\\hive-site.xml")
    val y=new Path("F:\\IdeaWorkspace\\lzm\\Resource\\yarn-site.xml")
    val m=new Path("F:\\IdeaWorkspace\\lzm\\Resource\\mapred-site.xml")
    conf.addResource(hd)
    conf.addResource(c)
    conf.addResource(hi)
    conf.addResource(m)
    conf.addResource(y)
    for(c<-conf.iterator()){
      sparkBuilder.config(c.getKey,c.getValue)
    }
    System.setProperty("user.name", "dip")

   val spark:SparkSession=sparkBuilder.master("local[2]").enableHiveSupport().getOrCreate()
    spark.sql("show databases").show()
    spark.stop()
  }
}
View Code
</dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.25</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>2.3.0</version>
      <!--<scope>provided</scope>-->
    </dependency>
  </dependencies>
View Code