1. 程式人生 > 資訊 >華虹半導體宣佈 90nm BCD 工藝實現量產:效能高、核心面積小

華虹半導體宣佈 90nm BCD 工藝實現量產:效能高、核心面積小

簡介

Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。

http://spark.apache.org/sql/

為什麼要學習Spark SQL?我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然後提交到叢集上執行,大大簡化了編寫MapReduce的程式的複雜性,由於MapReduce這種計算模型執行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然後提交到叢集執行,執行效率非常快!同時Spark SQL也支援從Hive中讀取資料。

Spark SQL允許使用SQL或數學的DataFrame API在Spark程式中查詢結構化資料。可用於Java,Scala,Python和R。

Spark SQL的特點:

1.容易整合(整合)

2.統一的資料訪問方式

3.相容Hive

4.標準的資料連線

工作架構

Spark可以分為1個driver(膝上型電腦或者叢集閘道器機器上,使用者編寫的Spark程式)和若干個executor(在RDD分佈的各個節點上)。
通過SparkContext(簡稱sc)連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),簡單可以認為SparkContext是Spark程式的根本。
Driver會把計算任務分成一系列小的task,然後送到executor執行。executor之間可以通訊,在每個executor完成自己的task以後,所有的資訊會被傳回。

本圖片來源網路

1. Client客戶端:我們在本地編寫了spark程式,打成jar包,或python指令碼,通過spark submit命令提交到Spark叢集;
2. 只有Spark程式在Spark叢集上執行才能拿到Spark資源,來讀取資料來源的資料進入到記憶體裡;
3. 客戶端就在Spark分散式記憶體中並行迭代地處理資料,注意每個處理過程都是在記憶體中並行迭代完成;注意:每一批節點上的每一批資料,實際上就是一個RDD!!!一個RDD是分散式的,所以資料都散落在一批節點上了,每個節點都儲存了RDD的部分partition。
4. Spark與MapReduce最大的不同在於,迭代式計算模型:MapReduce,分為兩個階段,map和reduce,兩個階段完了,就結束了,所以我們在一個job裡能做的處理很有限; Spark,計算模型,可以分為n個階段,因為它是記憶體迭代式的。我們在處理完一個階段以後,可以繼續往下處理很多個階段,而不只是兩個階段。所以,Spark相較於MapReduce來說,計算模型可以提供更強大的功能。

spark的生態系統:

Mesos和yarn 作用一樣,資源排程平臺,用yarn的比較多
Tachyon:(1)記憶體當中hdfs(記憶體中的分散式儲存系統,加快spark在記憶體中讀取和處理速度)
(2)在不同應用程式之間實現資料共享
spark core:spark的核心,用於離線計算

語法

agg:在整體DataFrame不分組聚合

withColumn:新增額外列方法

join:兩表拼接sc.join(student,sc("sid")===student("sid"), "left").show ,Join有inner,leftouter,rightouter,fullouter,leftsemi,leftanti六種型別

函式返回值

// 1、使用return
def functionName ([引數列表]) : [return type] = {
   function body
   return [expr]
}

// 2、直接把返回值寫在最後:
object Test {
   def main(args: Array[String]) {
        println( "Returned Value : " + addInt(5,7) );
   }
   def addInt( a:Int, b:Int ) : Int = {
      var sum:Int = 0
      sum = a + b

      sum
   }
}
results = spark.sql("SELECT * FROM people")
names = results.map(lambda p: p.name)
results.agg(count(ip).alias(ip_cnt))  
results.withColumn("day_num", lit(1)).join(results, Seq("user"), "leftOuter").select("name", "day_num", "ip_cnt")

Join的例子

package com.sparkbyexamples.spark.dataframe.join

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
object JoinExample extends App {

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
      (6,"Brown",2,"2010","50","",-1)
  )
  val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")
  import spark.sqlContext.implicits._
  val empDF = emp.toDF(empColumns:_*)
  empDF.show(false)

  val dept = Seq(("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  )

  val deptColumns = Seq("dept_name","dept_id")
  val deptDF = dept.toDF(deptColumns:_*)
  deptDF.show(false)


  println("Inner join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"inner")
    .show(false)

  println("Outer join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"outer")
    .show(false)
  println("full join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"full")
    .show(false)
  println("fullouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"fullouter")
    .show(false)

  println("right join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"right")
    .show(false)
  println("rightouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"rightouter")
    .show(false)

  println("left join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"left")
    .show(false)
  println("leftouter join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftouter")
    .show(false)

  println("leftanti join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftanti")
    .show(false)

  println("leftsemi join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"leftsemi")
    .show(false)

  println("cross join")
  empDF.join(deptDF,empDF("emp_dept_id") ===  deptDF("dept_id"),"cross")
    .show(false)

  println("Using crossJoin()")
  empDF.crossJoin(deptDF).show(false)

  println("self join")
  empDF.as("emp1").join(empDF.as("emp2"),
    col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner")
    .select(col("emp1.emp_id"),col("emp1.name"),
      col("emp2.emp_id").as("superior_emp_id"),
      col("emp2.name").as("superior_emp_name"))
      .show(false)

  empDF.createOrReplaceTempView("EMP")
  deptDF.createOrReplaceTempView("DEPT")

  //SQL JOIN
  val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id")
  joinDF.show(false)

  val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id")
  joinDF2.show(false)

}

資料

http://spark.apache.org/sql/

https://www.cnblogs.com/lq0310/p/9842078.html

https://sparkbyexamples.com/spark/spark-sql-dataframe-join/

https://www.jianshu.com/p/69bff3c7ec97

-----Spark SQL Join------

https://blog.51cto.com/wangyichao/2351971

https://www.cnblogs.com/duodushuduokanbao/p/9911256.html

https://support-it.huawei.com/docs/zh-cn/fusioninsight-all/developer_guide/zh-cn_topic_0171822912.html