1. 程式人生 > >SparkSQL的小學習(方便隨時檢視)

SparkSQL的小學習(方便隨時檢視)

1、SQLContextApp

package sparkSQLmook

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}


/**
  * spark1.6的SQL使用,本地模式
  */
object SQLContextApp {

 /* def main(args: Array[String]): Unit = {

      //val path = args(0)
      val conf = new SparkConf()
                   .setAppName("SQLContextApp")
                   .set("spark.sql.warehouse.dir","file:///")
                   .setMaster("local[2]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)


      val df = sqlContext.read.format("json").load("C://Users//shujuelin//Desktop//spark//people.json")

      df.show()
      sc.stop()
  }*/

  //生產模式
  def main(args: Array[String]): Unit = {

     val path = args(0)
      val conf = new SparkConf()

      val sc = new SparkContext(conf)

      val sqlContext = new SQLContext(sc)

      val df = sqlContext.read.format("json").load(path)


      df.show()
      sc.stop()
  }
}

2、SparkSessionApp

package sparkSQLmook

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession

object SparkSessionApp {

  def main(args: Array[String]): Unit = {

     val spark = new SparkSession
                       .Builder()
                        .master("local")
                        .appName("SparkSessionApp")
                        .config("spark.sql.warehouse.dir","file:///")
                        .getOrCreate()

    val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")

    df.show()

  }

}

 

3、ParquetDemo

package sparkSQLmook

import org.apache.spark.sql.SparkSession

object ParquetDemo {

  def main(args: Array[String]): Unit = {

     val spark = SparkSession
                  .builder()
                   .appName("ParquetDemo")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                  .getOrCreate()

    /**
      * spark.read.format("parquet").load標準寫法
      */
     //val rddDF = spark.read.format("parquet").load("C://Users//shujuelin//Desktop//spark//users.parquet")
     //rddDF.show()

    //sparksql預設的處理format就是parquet
    // rddDF.select("name","favorite_color").write.format("json").save("C://Users//shujuelin//Desktop//spark//userss.json")

    /**
      * 通用型
      */
    val DF = spark.read.load("C://Users//shujuelin//Desktop//spark//users.parquet")
    DF.show(false)
    spark.stop()
  }

}

4、JdbcBeelineSQL

package sparkSQLmook

import java.sql.DriverManager

//通過jdbc方式訪問sparkSQL
object JdbcBeelineSQL {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://spark1:10000","root","")
    val pstmt = conn.prepareStatement("select name,age,score from students")
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      println("name:" + rs.getString("name") +
        " , age:" + rs.getInt("age") +
        " , score:" + rs.getDouble("score"))

    }

    rs.close()
    pstmt.close()
    conn.close()


  }
}

5、HiveMySQLApp

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
 * 使用外部資料來源綜合查詢Hive和MySQL的表資料
  *  外部資料來源綜合案例(重要)     把hive裡的表和mysql裡的表結合在一起
create database spark;
use spark;
//建立表
CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;

  //插入資料
INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');

  */
object HiveMySQLApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("HiveMySQLApp")
      .master("local[2]").getOrCreate()

    // 載入Hive表資料
    val hiveDF = spark.table("emp")

    // 載入MySQL表資料
    val mysqlDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306").option("dbtable", "spark.DEPT").option("user", "root").option("password", "root").option("driver", "com.mysql.jdbc.Driver").load()

    // JOIN
    val resultDF = hiveDF.join(mysqlDF, hiveDF.col("deptno") === mysqlDF.col("DEPTNO"))
    resultDF.show


    resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),
      mysqlDF.col("deptno"), mysqlDF.col("dname")).show

    spark.stop()
  }

}

6、HiveAPP

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * sparkSQL操作hive表資料   1.讀:spark.table(tablename)
  * 2.寫:df.write.saveAsTable(tablename)
  */
object HiveAPP {
//採用spark.sql方式 --->在spark-shell裡操作
def main(args: Array[String]): Unit = {

  val spark = SparkSession.builder()
    .appName("DataFrameRdd")
    .master("local[2]")
    .config("spark.sql.warehouse.dir","file:///")
    .getOrCreate()

  import spark.implicits._


   //val HiveDf = spark.table("t_movies")
   spark.sql("show databases").show()

   }
}

7、DatasetApp

 

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
 * Dataset操作  ->讀取scv檔案
 */
object DatasetApp {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("DatasetApp")
      .master("local[2]").getOrCreate()

    //注意:需要匯入隱式轉換
    import spark.implicits._

    val path = "file:///f:/text/sales.csv"

    //spark如何解析csv檔案? 頭:就是csv中的欄位   inferSchema:自動推斷schema
    val df = spark.read.option("header","true").option("inferSchema","true").csv(path)
    df.show

    val ds = df.as[Sales]  //DataFrame轉換為DataSet
    //map是迭代,每一行只取出id
    ds.map(line => line.itemId).show


    spark.sql("seletc name from person").show

    //df.seletc("name")
    df.select("name")

    ds.map(line => line.itemId)

    spark.stop()
  }

  case class Sales(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}

8、DataFrameRdd

 

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * 將RDD轉換為DataFrame  第一種方式:採用反射的方式
  */

object DataFrameRdd {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession.builder()
                   .appName("DataFrameRdd")
                   .master("local[2]")
                    .config("spark.sql.warehouse.dir","file:///")
                     .getOrCreate()

     //RDD ==> DataFrame

     val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")

     import spark.implicits._

    //把Rdd轉換為DataFrame
     val lineDF = rdd.map(_.split(",")).map(line => info(line(0).toInt,line(1),line(2).toInt)).toDF()
    /* val lineRDD = rdd.map(line => line.split(","))
     val lineDF = lineRDD.map(lines => info(lines(0).toInt,lines(1),lines(2).toInt)).toDF()*/
     //lineDF.show()

     //1.基於DataFrame的api程式設計
     //lineDF.filter($"age">20).show()
    //2.基於sql的api
     lineDF.createOrReplaceTempView("info")
     spark.sql("select name,age from info where age >20").show()

      spark.stop()
  }

  case class info(id : Int, name : String, age : Int)

}

9、DataFrameRdd2

 

package sparkSQLmook

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * 把RDD轉換為DataFrame的第二種方式:程式設計式Row。當不知道資料的型別時候,採用
  */
object DataFrameRdd2 {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession
                   .builder()
                  .appName("DataFrameRdd2")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                  .getOrCreate()

      val rdd = spark.sparkContext.textFile("C:/Users/shujuelin/Desktop/spark/infos.txt")

      //1.把rdd轉為row
      val rddRow = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt))
      //2.對row建立scheme元資料結構
      val structType = StructType(Array(
        StructField("id",IntegerType,true),
        StructField("name",StringType,true),
        StructField("age",IntegerType,true)))
      //把row和scheme繫結
      val df = spark.createDataFrame(rddRow,structType)
      //df.show()

    //sql式程式設計
      df.createOrReplaceTempView("info")//建立臨時表

      spark.sql("select * from info where age > 20").show()

    spark.stop()
  }

}

10、DataFrameOperation

 

package sparkSQLmook

import org.apache.spark.sql
import org.apache.spark.sql.functions._
/**
  * DataFrame的API操作
  */
object DataFrameOperation {

  def main(args: Array[String]): Unit = {

    val spark = new sql.SparkSession
    .Builder()
      .master("local")
      .appName("SparkSessionApp")
      .config("spark.sql.warehouse.dir","file:///")
      .getOrCreate()

    //隱式轉換
    import spark.implicits._
    //val df = spark.read.json("C:/Users/shujuelin/Desktop/spark/people.json")
     val df = spark.read.format("json").load("C:/Users/shujuelin/Desktop/spark/people.json")

    //df.show()//show()預設20條
    //df.printSchema()
    //df.select("name").show()//select操作,典型的弱型別,untyped操作
   // df.select($"name", $"age" + 1).show()  // 使用表示式,scala的語法,要用$符號作為字首。對年齡加1
    //df.filter($"age">19).show()
    //另一種寫法
    //df.filter(df.col("age")>19).show()
    df.select(df.col("name"),(df.col("age")+3).as("age2")).show() //別名
    //df.groupBy("age").count().show()//先分組在進行聚合
    spark.stop()
  }

}

11、DataFrameCase

 

package sparkSQLmook

import org.apache.spark.sql.SparkSession

/**
  * DataFrame的案例(api其他操作)
  */
object DataFrameCase {

  def main(args: Array[String]): Unit = {

      val spark = SparkSession
                  .builder()
                   .appName("DataFrameCase")
                   .master("local[2]")
                   .config("spark.sql.warehouse.dir","file:///")
                   .getOrCreate()

      val rdd = spark.sparkContext.textFile("C://Users//shujuelin//Desktop//spark//student.data")

      import spark.implicits._
     // 分割符|必須要加轉義字元\\
    //採用反射的方式轉換為dataframe
      val infoDF = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()

      infoDF.show(false)//show 預設顯示20條  如果超出可以用 infoDF.show(30,false),false代表不擷取
      //infoDF.take(10).foreach(println)
      //infoDF.first()//拿第一條
      //infoDF.head(3) // 拿前三條
      //infoDF.select("name","phone").show()
      //infoDF.show(20,false)

    //過濾名字為空的和NULL的
    //infoDF.filter("name = '' or name = 'NULL'").show()

    //過濾名字以s開頭的
    //infoDF.filter("substr(name,0,1) = 's'").show(20,false)

    //排序
    //按照名字排序.預設升序
     //infoDF.sort($"name".desc).show()//或者  infoDF.sort(infoDF.col("name").desc).show()

     //infoDF.sort(infoDF.col("name").asc,infoDF.col("id").desc).show(20,false)

     //改欄位名字
      //infoDF.select(infoDF.col("name").as("student_info")).show(20,false)



     //join操作
    /* val infoDF2 = rdd.map(_.split("\\|")).map(lines => info(lines(0).toInt,lines(1),lines(2),lines(3))).toDF()

     infoDF.join(infoDF2, infoDF.col("id") === infoDF2.col("id")).show(20,false) //預設inner連線
*/
      spark.stop()

  }

  case class info(id : Int, name : String, phone : String, email : String)

}