1. 程式人生 > >大資料實時計算Spark學習筆記(9)—— Spar SQL(1) 讀取 json 檔案

大資料實時計算Spark學習筆記(9)—— Spar SQL(1) 讀取 json 檔案

1 Spark SQL

  • 程式設計方式:(1)SQL;(2) DataFrame API
scala> case class Customer(id:Int,name:String,age:Int)
defined class Customer

scala> val arr = Array("1,Mike,20","2,Mary,19","3,Jerry,23")
arr: Array[String] = Array(1,Mike,20, 2,Mary,19, 3,Jerry,23)

scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd1.collect
res1: Array[String] = Array(1,Mike,20, 2,Mary,19, 3,Jerry,23)

scala> :paste
// Entering paste mode (ctrl-D to finish)

rdd1.map(e=>{
val arr = e.split(",")
Customer(arr(0).toInt,arr(1),arr(2).toInt)
})

// Exiting paste mode, now interpreting.

res2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[2] at map at <console>:31

scala> val rdd2 = res2
rdd2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[2] at map at <console>:31

scala> rdd2.collect
res3: Array[Customer] = Array(Customer(1,Mike,20), Customer(2,Mary,19), Customer(3,Jerry,23))

scala> val df = spark.createDataFrame(rdd2)
18/12/28 18:38:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

scala> df.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1| Mike| 20|
|  2| Mary| 19|
|  3|Jerry| 23|
+---+-----+---+


scala> df.createTempView("customer")

scala> val df2 = spark.sql("select * from customer")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df2.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1| Mike| 20|
|  2| Mary| 19|
|  3|Jerry| 23|
+---+-----+---+

scala> val df2 = spark.sql("select * from customer where id <2")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df2.show
+---+----+---+
| id|name|age|
+---+----+---+
|  1|Mike| 20|
+---+----+---+


scala> val df1 = spark.sql("select * from customer where id < 2")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df1.show
+---+----+---+
| id|name|age|
+---+----+---+
|  1|Mike| 20|
+---+----+---+


scala> val df2 = spark.sql("select * from customer where id > 2")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> df2.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  3|Jerry| 23|
+---+-----+---+

// union => 縱向連線
scala> val dff = spark.sql("select * from c1 union select * from c2")
dff: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> dff.show
+---+-----+---+                                                                 
| id| name|age|
+---+-----+---+
|  3|Jerry| 23|
|  1| Mike| 20|
+---+-----+---+


2 Spark SQL 讀取 json 檔案

  • Dataset<Row> === DataFrame,類似於table的操作
  • SparkSession.read().json()
  • SparkSession.write().json()
    在這裡插入圖片描述

2.1 Spark SQL Java 版本

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;


public class SQLJava {
    public static void main(String[] args) {

        SparkSession session = SparkSession.builder()
                .appName("SQLJava")
                .config("spark.master", "local[2]")
                .getOrCreate();

        Dataset<Row> df = session.read().json("d:/json.json");

        df.createOrReplaceTempView("stu");
        df = session.sql("select * from stu");
        df.show();
    }
}

在這裡插入圖片描述

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.function.Consumer;


public class SQLJava {
    public static void main(String[] args) {

        SparkSession session = SparkSession.builder()
                .appName("SQLJava")
                .config("spark.master", "local[2]")
                .getOrCreate();

        Dataset<Row> df1 = session.read().json("d:/json.json");

        df1.createOrReplaceTempView("stu");
        df1 = session.sql("select * from stu");
        df1.show();

        Dataset<Row> df2 = session.sql("select * from stu where age > 20");
        df2.show();
        System.out.println("=============================");

        //聚合查詢
        Dataset<Row> dfCount = session.sql("select count(*) from stu");
        dfCount.show();

        /*
        * DataFrame 轉換為 RDD
        * */
        JavaRDD<Row> rdd = df1.toJavaRDD();
        rdd.collect().forEach(new Consumer<Row>() {
            public void accept(Row row) {
                Long id = row.getAs("id");
                String name = row.getAs("name");
                Long age = row.getAs("age");
                System.out.println(id + "-" + name + "-" + age);
            }
        });


    }
}