spark基礎知識學習第五天(適合新手學習)
Shark
Hive sql -> mr
Shark sql ->Spark core
Spark2.0之前的版本的Spark-SQL並不支援開窗函式和子查詢的
1.Spark SQL1.6.x特點:
(1).記憶體列儲存(不是按照物件儲存的),面向列的儲存方式(減少對記憶體的消耗)
(2).位元組碼生成技術(動態位元組碼生成技術)
(3).程式碼編寫優化
----------------------------------------
1.易整合
2.統一的資料訪問方式
3.相容Hive
4.標準的資料連線
開窗函式:與聚合函式一樣,開窗函式也是對行集組進行聚合計算,但是普通聚合函式每組只能返回一個值,而開窗函式可以每組返回多個值。
開窗函式的呼叫格式為:
函式名(列)over(選項)
over關鍵字表示把函式當成開窗函式而不是聚合函式,SQL標準允許將所有聚合函式用做開窗函式,使用over關鍵字來區分這兩種用法。
-------------------------------------------------------------------------------------------------------
2.Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame
並且作為分散式SQL查詢引擎的作用。
----------------------------------------------------------------------------------------
3.為什麼要學習Spark SQL
我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然後提交到叢集上執行,
大大簡化了編寫MapReduce的程式的複雜性,由於MapReduce這種計算模型執行效率比較慢
。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然後提交到叢集執行,執行效率非常快!
--------------------------------------------------------------------------------------
4.什麼是DataFrames
與RDD類似,DataFrame也是一個分散式資料容器。然而DataFrame更像傳統資料庫的二維表格,
除了資料以外,還記錄資料的結構資訊,即schema。同時,與Hive類似,DataFrame也支援
巢狀資料型別(struct、array和map)。從API易用性的角度上 看,DataFrame API提供的
是一套高層的關係操作,比函式式的RDD API要更加友好,門檻更低。由於與R和Pandas的
DataFrame類似,SparkDataFrame很好地繼承了傳統單機資料分析的開發體驗。
-------------------------------------------------------------------------------------------
5.生成DataFrame:
DSL:
val seq =Seq(("1","xiaofeng",18),("2","yaoyao",24),("1","xueyan",22)) //建立一個序列集合
val rdd1 = sc.parallelize(seq) //將集合生成RDD
val df =rdd1.toDF("id","name","age") //通過RDD建立dateframe
df.show //相當於action
//查詢單個欄位
valres1 = df.select("name")
res1.show
//查詢多個欄位
df.select("name","age").show
//當條件查詢
df.select("name","age").filter(col("age")>18).show
---------------------------------------------------------------------------------
SQL:
//註冊二維表
df.registerTempTable("t_person") //註冊二維表
sqlContext.sql("select name,age fromt_person").show
//使用sqlContext.sql可以使用sql語句
sqlContext.sql("selectname,age from t_person where age >= 18 limit 2").show
讀取hdfs的檔案
val rdd1 =sc.textFile("hdfs://hadoop01:9000/person.txt") //通過hdfs中的資料生成RDD
case classPerson(id:Long,name:String,age:Int,fv:Int) //使用case class進行型別匹配
val PersonRDD = rdd1.map(_.split(",")).map(x =>Person(x(0).toLong,x(1)),x(2).toInt,x(3).toInt)
//讓資料和型別一一對應
val personDF = personRDD.toDF() //通過RDD建立dateframe
personDF.show //檢視生成的dateframe表格
personDF.select("name","age","fv")
personDF.registerTemTable("t_person")
sqlContext.sql("selectname ,age,fv from t_person where fv > 90").show
//檢視DataFrame結構資訊
personDF.printSchema
//查看錶的資訊(查詢表結構)
sqlContext.sql("desc t_person").show
---------------------------------------------------------------------------------------------------
案例通過反射推斷Schema:
packagecom.qf.gp1704.day10
importorg.apache.spark.sql.{DataFrame, SQLContext}
importorg.apache.spark.{SparkConf, SparkContext}
/**
* 通過反射推斷Schema
*/
objectInferSchema {
def main(args: Array[String]): Unit = {
valconf = newSparkConf().setAppName("InferSchema").setMaster("local")
valsc = new SparkContext(conf)
valsqlContext = new SQLContext(sc)
vallinesRDD =sc.textFile("hdfs://node01:9000/person.txt").map(_.split(","))
// 將RDD和Person關聯
val personRDD = linesRDD.map(p => Person(p(0).toInt, p(1),p(2).toInt, p(3).toInt))
// 呼叫toDF方法需要引入隱式轉換函式
import sqlContext.implicits._
// 將personRDD轉換成DataFrame
valpersonDF: DataFrame = personRDD.toDF()
// 註冊為一張臨時二維表
personDF.registerTempTable("t_person")
valsql = "select * from t_person where fv > 80 order by age desc limit10"
// 呼叫sqlContext例項的sql方法進行查詢
valres: DataFrame = sqlContext.sql(sql)
// 把結果集儲存到HDFS,mode是怎麼儲存(之前寫的資料一樣)。以什麼格式儲存
res.write.mode("append").json("hdfs://node01:9000/out-20180418-1")
注意:2.0版本後添加了很多格式,比如csv格式,如果沒有的格式可以通過新增第三方外掛匯入pom.xml來使用該格式
res.show()
sc.stop()
}
}
case class Person(id: Int, name: String,age: Int, fv: Int) //反射
----------------------------------------------------------------------------
通過StructType指定Schema
packagecom.qf.gp1704.day10
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SQLContext}
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
importorg.apache.spark.{SparkConf, SparkContext}
/**
* 通過StructType指定Schema
*/
objectStructTypeSchemaDemo {
def main(args: Array[String]): Unit = {
valconf = newSparkConf().setAppName("StructTypeSchemaDemo").setMaster("local")
valsc = new SparkContext(conf)
valsQLContext = new SQLContext(sc)
vallinesRDD =sc.textFile("hdfs://node01:9000/person.txt").map(_.split(","))
// 由StructType指定Schema
val schema:StructType = StructType {
Array(
StructField("id",IntegerType, false),
StructField("name",StringType, true),
StructField("age",IntegerType, true),
StructField("fv",IntegerType, true)
)
}
// 對映,通過Row可以封裝多個值,類似於元組封裝不同型別的值
val rowRDD:RDD[Row] = linesRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt, p(3).toInt))
// 轉換成DataFrame
val personDF:DataFrame = sQLContext.createDataFrame(rowRDD, schema)
// 註冊成臨時表
personDF.registerTempTable("t_person")
valsql = "select * from t_person"
val res = sQLContext.sql(sql)
res.write.mode("append").json("c://person")
sc.stop()
}
}
-------------------------------------------------------------------------------------------------------
案例:將資料匯入到Mysql中
packagecom.qf.gp1704.day10
importjava.util.Properties
importorg.apache.spark.{SparkConf, SparkContext}
importorg.apache.spark.sql.{Row, SQLContext}
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
objectInsertData2MySQL {
def main(args: Array[String]): Unit = {
valconf = newSparkConf().setAppName("InsertData2MySQL")//.setMaster("local")
valsc = new SparkContext(conf)
valsQLContext = new SQLContext(sc)
vallinesRDD =sc.textFile("hdfs://node01:9000/test.txt").map(_.split(","))
val schema = StructType{Array(
StructField("name", StringType, true),
StructField("age",IntegerType, true)
)}
val personRDD = linesRDD.map(p => Row(p(0), p(1).toInt))
val personDF = sQLContext.createDataFrame(personRDD, schema)
//請求MySQL的一些配置資訊
val prop = newProperties()
prop.put("user","root")
prop.put("password","root")
prop.put("driver","com.mysql.jdbc.Driver")
val url ="jdbc:mysql://node03:3306/bigdata"
val table ="person"
//把資料寫入MySQL,使用write方法並通過追加方式以jdbc連線的方式將資料
//存入MySQL中,jdbc中的三個引數分別是,url,表名,配置資訊
personDF.write.mode("append").jdbc(url,table, prop)
sc.stop()
}
}
------------------------------------------------------------------------------------------------------------------
6.JDBC
Spark SQL可以通過JDBC從關係型資料庫中讀取資料的方式建立DataFrame,通過對DataFrame一系列的計算後,
還可以將資料再寫回關係型資料庫中。
(1).從MySQL中載入資料(Spark Shell方式)
1.啟動Spark Shell,必須指定mysql連線驅動jar包
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell\
--masterspark://node01:7077 \
--jars/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
--driver-class-path/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar
//這裡需要載入兩次jar包,第一次是需要jar包,第二次是需要驅動類
2.從mysql中載入資料,生成DataFrame,以jdbc的方式讀取
valjdbcDF = sqlContext.read.format("jdbc").options(Map("url"-> "jdbc:mysql://node03:3306/bigdata", "driver" ->"com.mysql.jdbc.Driver", "dbtable" ->"person", "user" -> "root","password" -> "root")).load()
3.執行查詢
jdbcDF.show() //show方法底層呼叫的還是println方法
-----------------------------------------------------------------------------------------------------------------------
4.將資料寫入到MySQL中(打jar包方式)
1.編寫Spark SQL程式
packagecom.qf.spark.sql
importjava.util.Properties
importorg.apache.spark.sql.{SQLContext, Row}
importorg.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
importorg.apache.spark.{SparkConf, SparkContext}
objectJdbcRDD {
def main(args: Array[String]) {
valconf = new SparkConf().setAppName("MySQL-Demo")
valsc = new SparkContext(conf)
valsqlContext = new SQLContext(sc)
//通過並行化建立RDD
valpersonRDD = sc.parallelize(Array("1 tom 5", "2 jerry 3","3 kitty 6")).map(_.split(" "))
//通過StructType直接指定每個欄位的schema
valschema = StructType(
List(
StructField("id",IntegerType, true),
StructField("name",StringType, true),
StructField("age",IntegerType, true)
)
)
//將RDD對映到rowRDD
valrowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//將schema資訊應用到rowRDD上
valpersonDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//建立Properties儲存資料庫相關屬性
valprop = new Properties()
prop.put("user","root")
prop.put("password","123456")
//將資料追加到資料庫
personDataFrame.write.mode("append").jdbc("jdbc:mysql://192.168.10.1:3306/bigdata","bigdata.person", prop)
//停止SparkContext
sc.stop()
}
}
2.用maven將程式打包
3.將Jar包提交到spark叢集
/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit\
--classcom.qf.spark.sql.JdbcRDD \
--masterspark://node01:7077 \
--jars/usr/local/spark-1.6.1bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
--driver-class-path/usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \
/root/spark-mvn-1.0-SNAPSHOT.jar
-------------------------------------------------------------------------------------------------------------------------
6.Hive-On-Spark
在SparkSQL之後出現的,主要是和Hive-On-Tez進行競爭
配置:
(1).將Hadoop的/etc/hadoop下的配置檔案core-site.xml複製到Spark的conf目錄
//主要獲取hdfs上的資料
(2).將Hive的conf目錄下的hive-site.xml複製到Spark的conf目錄
//獲取元資料資訊
啟動:./spark-sql --master spark://hadoop01:7077 --
-----------------------------------------------------------------------------------------------------
7.Kafka:(訊息中介軟體)(開源訊息系統)----scala寫成---相當於微信公眾號
傳遞生產者和消費者之間的資訊的橋樑,一般用來快取資料
是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。
預設存168小時(一週)
-------------------------------------------------------------------------------------------------------------------
8.JMS的基礎
(1).JMS是什麼:JMS是Java提供的一套技術規範
JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,
用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,
絕大多數MOM提供商都對JMS提供支援。
(2).JMS幹什麼用:用來異構系統 整合通訊,緩解系統瓶頸,提高系統的伸縮性增強系統使用者體驗,使得系統模組化和元件化變得可行並更加靈活
(3).JMS訊息傳輸模型
l 點對點模式(一對一,消費者主動拉取資料(pull),訊息收到後訊息清除)
點對點模型通常是一個基於拉取或者輪詢的訊息傳送模型,這種模型從佇列中請求資訊,而不是將訊息推送到客戶端。這個模型的特點是傳送到佇列的訊息被一個且只有一個接收者接收處理,即使有多個訊息監聽者也是如此。
l 釋出/訂閱模式(一對多,資料生產後,推送給所有訂閱者push)
釋出訂閱模型則是一個基於推送的訊息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收訊息,而持久訂閱者則監聽主題的所有訊息,即當前訂閱者不可用,處於離線狀態。
(4).JMS核心元件
l Destination:訊息傳送的目的地,也就是前面說的Queue和Topic。
l Message :從字面上就可以看出是被髮送的訊息。
l Producer: 訊息的生產者,要傳送一個訊息,必須通過這個生產者來發送。
l MessageConsumer: 與生產者相對應,這是訊息的消費者或接收者,通過它來接收一個訊息。
(5).常見的類JMS訊息伺服器:
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的
分散式訊息中介軟體 Metamorphosis:
Metamorphosis (MetaQ) 是一個高效能、高可用、可擴充套件的分散式訊息中介軟體,類似於LinkedIn的Kafka,具有訊息儲存順序寫、吞吐量大和支援本地和XA事務等特性,適用於大吞吐量、順序訊息、廣播和日誌資料傳輸等場景,在淘寶和支付寶有著廣泛的應用,現已開源。
分散式訊息中介軟體 RocketMQ--- (MetaQ) 的3.0版本
-----------------------------------------------------------------------------------------------------------------
9.為什麼需要訊息佇列(重要、瞭解)
訊息系統的核心作用就是三點:解耦,非同步和並行
以使用者註冊的案列來說明訊息系統的作用
-------------------------------------------------------------------------------------------------
10.Kafka核心元件(重要)
l Topic :訊息根據Topic進行歸類
l Producer:傳送訊息者
l Consumer:訊息接受者
l broker:每個kafka例項(server)
l Zookeeper:依賴叢集儲存meta資訊。
生產者可以隨意將資料傳遞給Kafka叢集中的各個分割槽中
Kafka叢集中將資料通過訊息佇列(先進先出,單向傳遞)儲存資料
消費者叢集向kafka叢集請求資料
當消費者某一叢集掛掉後會讓其他叢集拿到掛掉叢集所要分配到的資料,會出現資料偏移問題
兩個叢集同時請求一個數據的時候,會出現執行緒等待問題
每個消費者叢集都能隨便拿到kafka中執行緒開啟的資料
角色:
Producer叢集:
1、生產者負責獲取資料並把資料傳到Kafka的,比如flume、logstash,
生產者會監控一個目錄負責把資料採集到Kafka
2、生產者叢集是由多個程序組成,一個生產者作為一個獨立的程序
3、多個生產者傳送的資料是可以存到同一個topic的同一個partition的
4、一個生產者的資料可以放到多個topic中
5、單個生產者就具有資料分發的能力
Kafka叢集:
1、Kafka叢集可以儲存多種資料型別的資料,一種資料型別可以儲存到一個topic中
一個Kafka叢集中可以建立多個topic
2、每個topic可以建立多個分割槽和多個副本,分割槽的數量和副本的數量是在建立topic時指定的,後期也可以執行相應的命令更改分割槽數和副本數
3、每個分割槽的資料是由多個segment組成,一個segment檔案裡有多個index檔案和對應的資料檔案(.log)組成
4、一個topic的分割槽資料可以有多個備份(副本),原始資料和副本資料不可以在同一個節點上
Consumer叢集:
1、消費者負責拉取資料,比如:SparkStreaming、Storm
2、一個ConsumerGroup稱為Consumer叢集
3、新增或減少Consumer成員時會觸發Consumer叢集的負載均衡
4、ConsumerGroup可以消費一個或多個分割槽的資料,相反,一個分割槽的資料同一時刻只能被一個Consumer來消費
5、Consumer成員之間 消費的資料各不相同,在同一個group中資料不可以重複消費
Producer的寫入流程:
只有pull過程全部結束,每個副本都發送了ack後,這個資料才能是消費的,這是一整個過程,必須將資料傳給leader,然後follower從leader中拉(pull)資料,然後寫入到followers完成後向leader傳送ack表示過程已經完成。leader收到所有副本的ack後向peoducer傳送ack