華虹半導體宣佈 90nm BCD 工藝實現量產:效能高、核心面積小
簡介
Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。
為什麼要學習Spark SQL?我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然後提交到叢集上執行,大大簡化了編寫MapReduce的程式的複雜性,由於MapReduce這種計算模型執行效率比較慢。所以Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然後提交到叢集執行,執行效率非常快!同時Spark SQL也支援從Hive中讀取資料。
Spark SQL允許使用SQL或數學的DataFrame API在Spark程式中查詢結構化資料。可用於Java,Scala,Python和R。
Spark SQL的特點:
1.容易整合(整合)
2.統一的資料訪問方式
3.相容Hive
4.標準的資料連線
工作架構
Spark可以分為1個driver(膝上型電腦或者叢集閘道器機器上,使用者編寫的Spark程式)和若干個executor(在RDD分佈的各個節點上)。
通過SparkContext(簡稱sc)連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),簡單可以認為SparkContext是Spark程式的根本。
Driver會把計算任務分成一系列小的task,然後送到executor執行。executor之間可以通訊,在每個executor完成自己的task以後,所有的資訊會被傳回。
本圖片來源網路
1. Client客戶端:我們在本地編寫了spark程式,打成jar包,或python指令碼,通過spark submit命令提交到Spark叢集;
2. 只有Spark程式在Spark叢集上執行才能拿到Spark資源,來讀取資料來源的資料進入到記憶體裡;
3. 客戶端就在Spark分散式記憶體中並行迭代地處理資料,注意每個處理過程都是在記憶體中並行迭代完成;注意:每一批節點上的每一批資料,實際上就是一個RDD!!!一個RDD是分散式的,所以資料都散落在一批節點上了,每個節點都儲存了RDD的部分partition。
4. Spark與MapReduce最大的不同在於,迭代式計算模型:MapReduce,分為兩個階段,map和reduce,兩個階段完了,就結束了,所以我們在一個job裡能做的處理很有限; Spark,計算模型,可以分為n個階段,因為它是記憶體迭代式的。我們在處理完一個階段以後,可以繼續往下處理很多個階段,而不只是兩個階段。所以,Spark相較於MapReduce來說,計算模型可以提供更強大的功能。
spark的生態系統:
Mesos和yarn 作用一樣,資源排程平臺,用yarn的比較多
Tachyon:(1)記憶體當中hdfs(記憶體中的分散式儲存系統,加快spark在記憶體中讀取和處理速度)
(2)在不同應用程式之間實現資料共享
spark core:spark的核心,用於離線計算
語法
agg:在整體DataFrame不分組聚合
withColumn:新增額外列方法
join:兩表拼接sc.join(student,sc("sid")===student("sid"), "left").show ,Join有inner
,leftouter
,rightouter
,fullouter
,leftsemi
,leftanti
六種型別
函式返回值
// 1、使用return def functionName ([引數列表]) : [return type] = { function body return [expr] } // 2、直接把返回值寫在最後: object Test { def main(args: Array[String]) { println( "Returned Value : " + addInt(5,7) ); } def addInt( a:Int, b:Int ) : Int = { var sum:Int = 0 sum = a + b sum } }
results = spark.sql("SELECT * FROM people") names = results.map(lambda p: p.name) results.agg(count(ip).alias(ip_cnt)) results.withColumn("day_num", lit(1)).join(results, Seq("user"), "leftOuter").select("name", "day_num", "ip_cnt")
Join的例子
package com.sparkbyexamples.spark.dataframe.join import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col object JoinExample extends App { val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("SparkByExamples.com") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val emp = Seq((1,"Smith",-1,"2018","10","M",3000), (2,"Rose",1,"2010","20","M",4000), (3,"Williams",1,"2010","10","M",1000), (4,"Jones",2,"2005","10","F",2000), (5,"Brown",2,"2010","40","",-1), (6,"Brown",2,"2010","50","",-1) ) val empColumns = Seq("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary") import spark.sqlContext.implicits._ val empDF = emp.toDF(empColumns:_*) empDF.show(false) val dept = Seq(("Finance",10), ("Marketing",20), ("Sales",30), ("IT",40) ) val deptColumns = Seq("dept_name","dept_id") val deptDF = dept.toDF(deptColumns:_*) deptDF.show(false) println("Inner join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"inner") .show(false) println("Outer join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"outer") .show(false) println("full join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"full") .show(false) println("fullouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"fullouter") .show(false) println("right join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"right") .show(false) println("rightouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"rightouter") .show(false) println("left join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"left") .show(false) println("leftouter join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftouter") .show(false) println("leftanti join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftanti") .show(false) println("leftsemi join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"leftsemi") .show(false) println("cross join") empDF.join(deptDF,empDF("emp_dept_id") === deptDF("dept_id"),"cross") .show(false) println("Using crossJoin()") empDF.crossJoin(deptDF).show(false) println("self join") empDF.as("emp1").join(empDF.as("emp2"), col("emp1.superior_emp_id") === col("emp2.emp_id"),"inner") .select(col("emp1.emp_id"),col("emp1.name"), col("emp2.emp_id").as("superior_emp_id"), col("emp2.name").as("superior_emp_name")) .show(false) empDF.createOrReplaceTempView("EMP") deptDF.createOrReplaceTempView("DEPT") //SQL JOIN val joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") joinDF.show(false) val joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") joinDF2.show(false) }
資料
http://spark.apache.org/sql/
https://www.cnblogs.com/lq0310/p/9842078.html
https://sparkbyexamples.com/spark/spark-sql-dataframe-join/
https://www.jianshu.com/p/69bff3c7ec97
-----Spark SQL Join------
https://blog.51cto.com/wangyichao/2351971
https://www.cnblogs.com/duodushuduokanbao/p/9911256.html
https://support-it.huawei.com/docs/zh-cn/fusioninsight-all/developer_guide/zh-cn_topic_0171822912.html