1. 程式人生 > >-spark基礎操作

-spark基礎操作

dataframe

spark dataframe派生於RDD類,但是提供了非常強大的資料操作功能。主要對類SQL的支援。

DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料塊中的表,它與RDD最主要的區別在於:DataFrame有schema元資料,即DataFrame所表示的資料集的每一列都有名稱和資料型別。正是因為有了這些schema元資料,Sparl SQL的查詢優化器就可以進行鍼對性的優化。

spark dataframe 的幾個關鍵點:

  • 分散式的資料集

  • 類似關係型資料庫中的table,或者 excel 裡的一張 sheet,或者 python/R 裡的 dataframe

  • 擁有豐富的操作函式,類似於 rdd 中的運算元

  • 一個 dataframe 可以被註冊成一張資料表,然後用 sql 語言在上面操作

  • 豐富的建立方式

    • 已有的RDD

    • 結構化資料檔案

    • JSON資料集

    • Hive表

    • 外部資料庫

RDD和 DataFrame的比較

# 前者沒有schema資訊;後者有schema資訊

# RDD無法得知所存的資料元素的具體內部結構,Spark Core只能在stage層面進行簡單的優化;後者因為有schema資訊,Sparl SQL的查詢優化器就可以進行鍼對性的優化

# RDD通過函式式呼叫API,雖然簡潔明瞭,但是需要建立新的物件,不容易重用舊的物件,給GC帶來挑戰;DataFrame是儘可能的重用物件

在實際工作中會遇到這樣的情況,主要是會進行兩個資料集的篩選、合併,重新入庫。

首先載入資料集,然後在提取資料集的前幾行過程中,才找到limit的函式。

而合併就用到union函式,重新入庫,就是registerTemple註冊成表,再進行寫入到HIVE中。

不得不讚嘆dataframe的強大。

DataFrame建立方式

跟關係資料庫的表(Table)一樣,DataFrame是Spark中對帶模式(schema)行列資料的抽象。DateFrame廣泛應用於使用SQL處理大資料的各種場景。建立DataFrame有很多種方法,比如從本地List建立、從RDD建立或者從源資料建立,下面簡要介紹建立DataFrame的三種方法。

方法一,Spark中使用toDF函式建立DataFrame

通過匯入(importing)Spark sql implicits, 就可以將本地序列(seq), 陣列或者RDD轉為DataFrame。只要這些資料的內容能指定資料型別即可。

本地seq + toDF建立DataFrame示例:

import sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
​ sqlContext.implicits._
val df = Seq(
  (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
).toDF("int_column", "string_column", "date_column")
​

注意:如果直接用toDF()而不指定列名字,那麼預設列名為"1", "2", ...

通過case class + toDF建立DataFrame的示例

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
​
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
​
// 使用 sqlContext 執行 sql 語句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
​
// 注:sql()函式的執行結果也是DataFrame,支援各種常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
​
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
​
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
​
// 使用 sqlContext 執行 sql 語句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
​
// 注:sql()函式的執行結果也是DataFrame,支援各種常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
​

方法二,Spark中使用createDataFrame函式建立DataFrame

SqlContext中使用createDataFrame也可以建立DataFrame。跟toDF一樣,這裡建立DataFrame的資料形態也可以是本地陣列或者RDD。

通過row+schema建立示例

import org.apache.spark.sql.types._
val schema = StructType(List(
    StructField("integer_column", IntegerType, nullable = false),
    StructField("string_column", StringType, nullable = true),
    StructField("date_column", DateType, nullable = true)
))
​
val rdd = sc.parallelize(Seq(
  Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
​ org.apache.spark.sql.types._
val schema = StructType(List(
    StructField("integer_column", IntegerType, nullable = false),
    StructField("string_column", StringType, nullable = true),
    StructField("date_column", DateType, nullable = true)
))
​
val rdd = sc.parallelize(Seq(
  Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
  Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
​

方法三,通過檔案直接建立DataFrame

使用parquet檔案建立


val df = sqlContext.read.parquet("hdfs:/path/to/file")val df = sqlContext.read.parquet("hdfs:/path/to/file")

使用json檔案建立

val df = spark.read.json("examples/src/main/resources/people.json")
​
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+ df = spark.read.json("examples/src/main/resources/people.json")
​
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
/**
  * DataFrame API基本操作
  */
object DataFrameApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("DataFrameApp")
      .master("local[2]")
      .getOrCreate()

    // 將json檔案載入成一個dataframe
    val peopleDF = spark.read
      .format("json")
      .load("people.json")

    // 輸出dataframe對應的schema資訊
    peopleDF.printSchema()

    // 輸出資料集的前20條記錄
    peopleDF.show()

    //查詢某列所有的資料: select name from table
    peopleDF.select("name").show()

    // 查詢某幾列所有的資料,並對列進行計算: select name, age+10 as age2 from table
    peopleDF
      .select(peopleDF.col("name"), (peopleDF.col("age") + 10).as("age2"))
      .show()

    //根據某一列的值進行過濾: select * from table where age>19
    peopleDF.filter(peopleDF.col("age") > 19).show()

    //根據某一列進行分組,然後再進行聚合操作: select age,count(1) from table group by age
    peopleDF.groupBy("age").count().show()

    spark.stop()
  }
  
}
/**
  * RDD to DataFrame以及DataFrame操作
  */
object DataFrameCase {

  case class Student(id: Int, name: String, phone: String, email: String)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DataFrameRDDApp")
      .master("local[2]")
      .getOrCreate()

    // RDD ==> DataFrame
    val rdd =
      spark.sparkContext.textFile("student.data")

    //注意:需要匯入隱式轉換
    import spark.implicits._
    val studentDF = rdd
      .map(_.split("""|"""))
      .map(line => Student(line(0).toInt, line(1), line(2), line(3)))
      .toDF()

    //show預設只顯示前20條
    studentDF.show
    studentDF.show(30)
    studentDF.show(30, false)

    studentDF.take(10)
    studentDF.first()
    studentDF.head(3)

    studentDF.select("email").show(30, false)

    studentDF.filter("name=''").show
    studentDF.filter("name='' OR name='NULL'").show

    //name以M開頭的人
    studentDF.filter("SUBSTR(name,0,1)='M'").show

    studentDF.sort(studentDF("name")).show
    studentDF.sort(studentDF("name").desc).show

    studentDF.sort("name", "id").show
    studentDF.sort(studentDF("name").asc, studentDF("id").desc).show

    studentDF.select(studentDF("name").as("student_name")).show

    val studentDF2 = rdd
      .map(_.split("\\|"))
      .map(line => Student(line(0).toInt, line(1), line(2), line(3)))
      .toDF()

    studentDF
      .join(studentDF2, studentDF.col("id") === studentDF2.col("id"))
      .show

    spark.stop()

  }

}
package spark_basic

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{
  StringType,
  IntegerType,
  StructField,
  StructType
}

/**
  * DataFrame和RDD的互操作
  */
object DataFrameRDDApp {

  case class Info(id: Int, name: String, age: Int)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("DataFrameRDDApp")
      .master("local[2]")
      .getOrCreate()

    inferReflection(spark)

    program(spark)

    spark.stop()
  }

  def inferReflection(spark: SparkSession): Unit = {

    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    //注意:需要匯入隱式轉換
    import spark.implicits._
    val infoDF = rdd
      .map(_.split(","))
      .map(line => Info(line(0).toInt, line(1), line(2).toInt))
      .toDF()

    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show

    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  def program(spark: SparkSession): Unit = {
    // RDD ==> DataFrame
    val rdd = spark.sparkContext.textFile("file:///Users/rocky/data/infos.txt")

    val infoRDD = rdd
      .map(_.split(","))
      .map(line => Row(line(0).toInt, line(1), line(2).toInt))

    val structType = StructType(
      Array(StructField("id", IntegerType, true),
            StructField("name", StringType, true),
            StructField("age", IntegerType, true)))

    val infoDF = spark.createDataFrame(infoRDD, structType)
    infoDF.printSchema()
    infoDF.show()

    //通過df的api進行操作
    infoDF.filter(infoDF.col("age") > 30).show

    //通過sql的方式進行操作
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

}

package spark_basic

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

/**
  * HiveContext的使用
  * 使用時需要通過--jars 把mysql的驅動傳遞到classpathÒ
  */
object HiveContextApp {

  def main(args: Array[String]): Unit = {

    val warehouseLocation = "spark-warehouse"

    val sparkConf =
      new SparkConf().set("spark.sql.warehouse.dir", warehouseLocation)

    val spark =
      SparkSession.builder.enableHiveSupport.config(sparkConf).getOrCreate()

    //2)相關的處理:
    spark.table("emp").show

    //3)關閉資源
    spark.stop()
  }
}
package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * 使用外部資料來源綜合查詢Hive和MySQL的表資料
  */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HiveMySQLApp")
      .master("local[2]")
      .getOrCreate()

    // 載入Hive表資料
    val hiveDF = spark.table("emp")

    // 載入MySQL表資料
    val mysqlDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306")
      .option("dbtable", "spark.DEPT")
      .option("user", "root")
      .option("password", "root")
      .option("driver", "com.mysql.jdbc.Driver")
      .load()

    // JOIN
    val resultDF =
      hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show

    resultDF
      .select(hiveDF.col("empno"),
              hiveDF.col("ename"),
              mysqlDF.col("deptno"),
              mysqlDF.col("dname"))
      .show

    spark.stop()
  }
}
package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * Parquet檔案操作
  */
object ParquetApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .appName("SparkSessionApp")
      .master("local[2]")
      .getOrCreate()

    /**
      * spark.read.format("parquet").load 這是標準寫法
      */
    val userDF = spark.read
      .format("parquet")
      .load("users.parquet")

    userDF.printSchema()
    userDF.show()

    userDF.select("name", "favorite_color").show

    userDF
      .select("name", "favorite_color")
      .write
      .format("json")
      .save("file:///home/hadoop/tmp/jsonout")

    spark.read
      .load("users.parquet")
      .show

    //會報錯,因為sparksql預設處理的format就是parquet
    spark.read
      .load("people.json")
      .show

    spark.read
      .format("parquet")
      .option("path", "users.parquet")
      .load()
      .show

    spark.stop()
  }
}
package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * Schema Infer
  *
  * "spark.sql.sources.partitionColumnTypeInference.enabled" 預設是 true
  */
object SchemaInferApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .appName("SchemaInferApp")
      .master("local[2]")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", false)
      .getOrCreate()

    val df = spark.read
      .format("json")
      .load("file:///Users/rocky/data/json_schema_infer.json")

    df.printSchema()

    df.show()

    spark.stop()
  }

}
package spark_basic

import org.apache.spark.sql.SparkSession

/**
  * SparkSession的使用
  *
  * SparkSession是spark2.0以後預設的的統一客戶端程式入口。
  *
  * sparkSession是HiveContext和sqlContext的統一入口
  * sparkContext可以通過spark.sparkContext獲得
  */
object SparkSessionApp {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .appName("SparkSessionApp")
      .master("local[2]")
      .getOrCreate()
    val sc = spark.sparkContext

    val people = spark.read.json("file:///Users/rocky/data/people.json")
    people.show()

    spark.stop()
  }
}
package spark_basic

import java.sql.DriverManager

/**
  *  通過JDBC的方式訪問
  */
object SparkSQLThriftServerApp {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop001:14000","hadoop","")
    val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      println("empno:" + rs.getInt("empno") +
        " , ename:" + rs.getString("ename") +
        " , sal:" + rs.getDouble("sal"))

    }

    rs.close()
    pstmt.close()
    conn.close()
  }
}
package spark_basic

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

/**
  * SQLContext的使用:
  * 注意:IDEA是在本地,而測試資料是在伺服器上 ,能不能在本地進行開發測試的?
  */
object SQLContextApp {

  def main(args: Array[String]): Unit = {

    val path = args(0)

    //1)建立相應的Context
    val sparkConf = new SparkConf()

    //在測試或者生產中,AppName和Master我們是通過指令碼進行指定
    //sparkConf.setAppName("SQLContextApp").setMaster("local[2]")

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = spark.sparkContext

    //2)相關的處理: json
    val people = spark.sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()

    //3)關閉資源
    sc.stop()
  }
}