1. 程式人生 > >【Spark深入學習 -16】官網學習SparkSQL

【Spark深入學習 -16】官網學習SparkSQL

客戶 .com pmu 參考資料 一行 uap lsa bmi orb

----本節內容-------
1.概覽
1.1 Spark SQL
1.2 DatSets和DataFrame
2.動手幹活
2.1 契入點:SparkSession
2.2 創建DataFrames
2.3 非強類型結果集操作
2.4 程序化執行SQL查詢
2.5 全局臨時視圖
2.6 創建DataSets
2.7 與RDD交互操作
2.8 聚集函數
3.Spark數據源
3.1 通用Load/Save函數
3.2 Parquets文件格式

3.2.1 讀取Parquet文件
3.2.2 解析分區信息
3.2.3 Schema合並
3.2.4 Hive元與Parquet表轉換
3.3 JSON數據集
3.4 Hive表
3.5 JDBC連接其他庫
4 性能調優
4.1 緩存數據至內存
4.2 調優參數
5 分布式SQL引擎
5.1 運行Thrift JDBC/ODBC服務
5.2 運行Spark SQL CLI
6.參考資料
---------------------

最近好幾個好友和Spark官方文檔杠上了,準備共同整理一下Spark官方文檔,互相分享研究心得故有此一篇。本次參考官網的說明,著重介紹SparkSQL,結合官網提供的重要內容以及自己的理解做一次學習筆記,主要是針對spark2.0的官方文檔,本文不是對官網文檔的翻譯,但是主要參考內容來自官方文檔。

技術分享

1.概覽

技術分享Spark SQL是Spark的一個組件,能夠很好的處理結構化數據。Spark SQL記錄了更多數據結構化信息, 所以相比RDD,可以更好的處理結構化數據,並且具有更好的性能(Spark SQL都記錄了啥信息這麽能幹,舉個簡單的例子,一般的數據庫讀表中某個字段的數據,先拿到字段內容,然後還要去元數據表獲得這個字段表示什麽含義,來來回回的查,效率低,Spark SQL就不需要這樣,他一拿到字段內容就知道是什麽意思,因為他記錄了字段的含義,不需要去查元數據表來感知這個字段什麽含義,效率就提升了)。可以使用 SQL或者DataSet與Spark SQL進行交互。不管你使用Java、Scala還是python語言,Spark SQL底層計算引擎都是一樣的,所以支持很多語言開發,隨你挑,隨你用什麽語言開發。

1.1 Spark SQL
Spark SQL無縫集成Hive的sql語法,只需要做一些簡單的配置,怎麽配置,自行百度。通過SQL查詢返回的結果是Dataset/DataFrame,也支持命令行或者JDBC的方式連接Spark SQL.
1.2 DatSets和DataFrame
官網真的很啰嗦,嘮嘮叨叨的,這一段其實就講了這麽幾個要點
1)DataSet在spark1.6開始支持
2)DataFrame是一個分布式的數據集合,該數據集合以命名列的方式進行整合。DataFrame可以理解為關系數據庫中的一張表,也可以理解為R/Python中的一個data frame。DataFrames可以通過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程中生成的RDD等。
3)DataFrame的API支持4種語言:Scala、Java、Python、R。但是對python和R支持的不是很好,對Java和scala支持很好,


2.動手幹活

本例子演示如何創建SparkSession,創建DataFrames,操作Dataset
啟動Spark shell,
命令: bin/spark-shell --master spark://master01:7077
----------------------
import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark=SparkSession.builder().appName("test").config("spark.some.config.option","some-value").getOrCreate();
val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json");
df.show()
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
----------------------
2.1 契入點:SparkSession
入口點是SparkSession類,使用SparkSession.builder()可以獲得SparkSession對象
技術分享2.2 創建DataFrames
使用創建好的sparkSession來創建DataFrames,DataFrame可以來自RDD,或者Spark數據源
如hive,或則其他數據源,下面這個例子是讀取json數據,數據源格式,people.json在spark自帶文件夾內examples/src/main/resources/people.json
--------------------------
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
--------------------------
技術分享2.3 非強類型結果集操作
DataFrame在Scala、Java、Python和R中為結構化數據操作提供了一個特定領域語言支持。在Spark2.0中,在Scala和Java的API中,DataFrame僅僅是Dataset的RowS表示。與Scala/Java中的強類型的“帶類型轉換操作”相比,這些操作也可以看做“無類型轉換操作”。
打印出來表結構
技術分享打印列名為name的所有列內容
技術分享

選擇name和age列,並且age都加1
技術分享過濾出age大於21的記錄
技術分享

根據年齡分組,並統計個數
技術分享

除了簡單的列引用和表達式外,Dataset同時有豐富的函數庫,包括字符串操作、日期算法、常用數學操作等。完整的列表可參考DataFrame Function Reference。
2.4 程序化執行SQL查詢
Sparksession中的sql函數使得應用可以編程式執行SQL查詢語句並且已DataFrame形式返回:
技術分享
2.5 全局臨時視圖
臨時視圖是基於session級別的,創建視圖的session一旦掛掉臨時視圖的生命也就到此為止了,使用全局視圖,可以避免這樣的慘劇發生。
df.createGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()
技術分享
2.6 創建DataSets
Dataset與RDD很像,不同的是它並不使用Java序列化或者Kryo,而是使用特殊的編碼器來為網絡間的處理或傳輸的對象進行序列化。對轉換一個對象為字節的過程來說編碼器和標準系列化器都是可靠的,編碼器的代碼是自動生成並且使用了一種格式,這種格式允許Spark在不需要將字節解碼成對象的情況下執行很多操作,如filtering、sorting和hashing等。
case class Person(name: String, age: Long)
//創建一個Person然後轉化為DataSet
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
技術分享
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()

技術分享//換成你本地spark安裝路徑即可
val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show()
技術分享備註說明:
1)case 類在scala2.1.0最多支持22個字段
2)編碼器默認導入spark.implicits._
3)通過制定類名,DataFrame可以自動轉為DataSet

2.7 與RDD交互操作
Spark SQL支持兩種將已存在的RDD轉化為Dataset的方法。第一種方法:使用反射推斷包含特定類型對象的RDD的結構。這種基於反射的方法代碼更加簡潔,並且當你在寫Spark程序的時候已經知道RDD的結構的情況下效果很好。第二種方法:創建Dataset的方法是通過編程接口建立一個結構,然後將它應用於一個存在的RDD。雖然這種方法更加繁瑣,但它允許你在運行之前不知道其中的列和對應的類型的情況下構建Dataset。

使用反射推斷結構
Spark SQL的Scala接口支持自動的將一個包含case class的RDD轉換為DataFrame。這個case class定義了表結構。Caseclass的參數名是通過反射機制讀取,然後變成列名。Caseclass可以嵌套或者包含像Seq或Array之類的復雜類型。這個RDD可以隱式的轉換為一個DataFrame,然後被註冊為一張表。這個表可以隨後被SQL的statement使用。
-----------------------
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.Encoder
import spark.implicits._
//讀取txt文件->切分文件->將切分後的內容作為參數傳遞給Person類構建對象->轉為dataset
val peopleDF=spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
//註冊一個dataset臨時視圖
peopleDF.createOrReplaceTempView("people")
//使用sql執行標準sql語句,這裏的name和age是Person類的成員對象,要保持一致
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 20")
//使用map進行轉換,teenager 是一個數組,根據下標取得數據
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
teenagersDF.map(teenager => "Age: " + teenager(1)).show()
//根據列名取數
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
//dataset要進行map操作,要先定義一個Encoder,不支持map會給升級帶來較大麻煩
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
技術分享
-----------people.txt數據-----------

技術分享
通過編程接口指定Schema
當JavaBean不能被預先定義的時候(比如不同用戶解析同一行,解析結果字段可能就不同),編程創建DataFrame分為三步:
● 從原來的RDD創建一個Row格式的RDD
● 創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema
● 通過SparkSession提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema
----------------------------------
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
//定義一個shema列名,string類型
val schemaString = "name age"
//根據schema列名生成schema,通過StructType方式生成schema
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
//將RDD記錄轉為RowS 形式
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
// 創建dataFrame,將schema和文件內容RDD結合在一起了 val peopleDF = spark.createDataFrame(rowRDD, schema)
//創建臨時視圖
peopleDF.createOrReplaceTempView("people")
//執行sql
val results = spark.sql("SELECT name FROM people")
results.map(attributes => "Name: " + attributes(0)).show()
技術分享
2.8 聚集函數
DataFrames內置了常見的聚合函數,如min,max,count,distinct等,都是為DataFrame,用戶也可以定義自己的聚集函數
------------------
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
def dataType: DataType = DoubleType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble/buffer.getLong(1)
}

//////////////////////////////////////////////////
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json")
df.createOrReplaceTempView("employees") df.show()

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show()

備註:
1)employe.json,這個文件在我的網盤下,spark沒有自帶該文件
鏈接:http://pan.baidu.com/s/1bpqzII7 密碼:kuyv
2)如果你是file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/employe.json方式讀取本地文件,你得將employe.json分發到各個節點指定的目錄examples/src/main/resources/employe下面,否則會報錯。

3.Spark數據源
Spark SQL通過DataFrame接口,可以支持對多種數據源的操作。DataFrame可以使用關系轉換來進行操作,而且可以用來創建臨時視圖。將DataFrame註冊為臨時視圖可以允許你在數據上運行SQL查詢語句。本節講解使用SparkData Source加載數據和保存數據的通用方法,然後
詳細講述內部支持的數據源可用的特定操作。
3.1 通用Load/Save函數
Spark默認數據源格式將被用於所有的操作,默認是parquet文件格式,使用spark.sql.sources.default指定默認文件格式
--------------------
val usersDF = spark.read.load("/tmp/namesAndAges.parquet") usersDF.select("name", "age").write.save("namesAndAgestest.parquet")
--------------------
手動指定文件格式
你可以手動指定數據源以及數據源附帶的額外選項。數據源被他們的完全限定名來指定(如,org.apache.spark.sql.parquet),但對於內部支持的數據源,你可以使用短名(json,parquet,jdbc)。DataFrame可以使用這種語法從任何可以轉換為其他類型的數據源加載數據。
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("/tmp/namesAndAges.parquet")
在文件上直接執行SQL
除了使用讀取API加載一個文件到DATAFrame然後查詢它的方式,你同樣可以通過SQL直接查詢文件。
val sqlDF=spark.sql("SELECT name FROM parquet.`/tmp/namesAndAges.parquet`")
技術分享保存模式
保存操作可選SaveMode,它指定了如何處理現有的數據。需要重視的一點是這些保存模式沒有使用任何的鎖,並且不具有原子性。此外,當執行Overwrite時,數據將先被刪除,然後寫出新數據。
(1)Overwrite
如果有文件存在,新內容會覆蓋原有內容
---------------
import org.apache.spark.sql.SaveMode
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
//覆蓋模式,原來有文件存在會先刪除,在寫入
peopleDF.select("name", "age").write.mode(SaveMode.Overwrite).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
技術分享(2)Append
如果文件存在,就在原有的文件中追加新增內容
----------------------
import org.apache.spark.sql.SaveMode
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
//覆蓋模式,原來有文件存在會先刪除,在寫入
peopleDF.select("name", "age").write.mode(SaveMode.Append).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
技術分享(3)Ignore
如果有文件存在, 則不發生任何事情,和create table if not exists 一樣的功能
peopleDF.select("name", "age").write.mode(SaveMode.Ignore).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
技術分享(4)ErrorIfExists
如果文件存在,就報錯,默認就是這個模式
如果有文件存在, 則不發生任何事情,和create table if not exists 一樣的功能
peopleDF.select("name", "age").write.mode(SaveMode.ErrorIfExists).save("/tmp/test1.parquet")
//讀取剛剛寫入程序
val sqlDF=spark.sql("SELECT * FROM parquet.`/tmp/test1.parquet`").show()
技術分享保存數據到hive表
可以通過saveAsTable方法將DataFrames存儲到表中,現有的hive版本不支持該功能。與registerTempTable方法不同的是,saveAsTable將DataFrame中的內容持久化到表中,並在HiveMetastore中存儲元數據。存儲一個DataFrame,可以使用SQLContext的table方法。table先創建一個表,方法參數為要創建的表的表名,然後將DataFrame持久化到這個表中。
默認的saveAsTable方法將創建一個“managed table”,表示數據的位置可以通過metastore獲得。當存儲數據的表被刪除時,managed table也將自動刪除。
目前,saveAsTable不支持一外表的方式將dataframe內容保存到外表,需要打一個patch才能實現
從spark2.1開始,持久化源數據到表中的元數據,在hive中也可以進行分區存儲,這樣
· 查詢時只需要返回需要的分區數據,不需要查詢全部分區數據
· DDL語句,如ALTER TABLE PARTITION ... SET LOCATION,這樣的語句可以使用Datasource API來實現。

3.2 Parquets文件格式
3.2.1 讀取Parquet文件(Loading Data Programmatically)
Parquet是一種支持多種數據處理列存儲數據格式,Parquet文件中保留了原始數據的模式。Spark SQL提供了Parquet文件的讀寫功能。
讀取Parquet文件示例如下:
---------------
import spark.implicits._
val peopleDF = spark.read.format("json").load("file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")
peopleDF.write.parquet("/tmp/people.parquet")
val parquetFileDF = spark.read.parquet("people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show()
技術分享--------------------
3.2.2 解析分區信息
對表進行分區是對數據進行優化的方式之一。在分區的表內,數據通過分區列將數據存儲在不同的目錄下。Parquet數據源現在能夠自動發現並解析分區信息。例如,對人口數據進行分區存儲,分區列為gender和country,使用下面的目錄結構:

技術分享通過傳遞path/to/table給SparkSession.read.parquet or SparkSession.read.load,Spark SQL將自動解析分區信息。返回的DataFrame的Schema如下:
技術分享需要註意的是,數據的分區列的數據類型是自動解析的。當前,支持數值類型和字符串類型。自動解析分區類型的參數為:spark.sql.sources.partitionColumnTypeInference.enabled,默認值為true。如果想關閉該功能,直接將該參數設置為disabled。此時,分區列數據格式將被默認設置為string類型,不再進行類型解析。註意要解析的路徑寫法問題,是寫相對路徑還是絕對路徑,
3.2.3 Schema合並
像ProtocolBuffer、Avro和Thrift那樣,Parquet也支持Schema evolution(Schema演變)。用戶可以先定義一個簡單的Schema,然後逐漸的向Schema中增加列描述。通過這種方式,用戶可以獲取多個有不同Schema但相互兼容的Parquet文件。現在Parquet數據源能自動檢測這種情況,並合並這些文件的schemas。
因為Schema合並是一個高消耗的操作,在大多數情況下並不需要,所以Spark SQL從1.5.0開始默認關閉了該功能。可以通過下面兩種方式開啟該功能:
● 當數據源為Parquet文件時,將數據源選項mergeSchema設置為true
● 設置全局SQL選項spark.sql.parquet.mergeSchema為true
----------------
import spark.implicits._
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("/tmp/test_table1/key=1")
squaresDF .printSchema()

val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("/tmp/test_table1/key=2")
cubesDF.printSchema()
val mergedDF = spark.read.option("mergeSchema", "true").parquet("/tmp/test_table1")
mergedDF.printSchema()
技術分享
3.2.4 Hive metastore Parquet表轉換
Spark SQL緩存了Parquet元數據以達到良好的性能。當Hive metastore Parquet表轉換為enabled時,表修改後緩存的元數據並不能刷新。所以,當表被Hive或其它工具修改時,則必須手動刷新元數據,以保證元數據的一致性。示例如下:
--------------
sqlContext.refreshTable("my_table")
--------------
配置Parquet可以使用SparkSession的setConf方法或使用SQL執行SET key=value命令。詳細參數說明如下:
技術分享3.3 JSON數據集
Spark SQL能自動解析JSON數據集的Schema,讀取JSON數據集為DataFrame格式。讀取JSON數據集方法為SQLContext.read().json()。該方法將String格式的RDD或JSON文件轉換為DataFrame。
需要註意的是,這裏的JSON文件不是常規的JSON格式。JSON文件每一行必須包含一個獨立的、自滿足有效的JSON對象。如果用多行描述一個JSON對象,會導致讀取出錯。讀取JSON數據集示例如下:
--------------- val path = "file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json" val peopleDF = spark.read.json(path) peopleDF.printSchema() peopleDF.createOrReplaceTempView("people") val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show()
技術分享val otherPeopleRDD = spark.sparkContext.makeRDD("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = spark.read.json(otherPeopleRDD)
otherPeople .printSchema() otherPeople.show()

技術分享

3.4 Hive表
------------------
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession case class Record(key: Int, value: String) val warehouseLocation = "/user/hive/warehouse" val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate() import spark.implicits._
import spark.sql sql("CREATE TABLE IF NOT EXISTS src(key INT, value STRING)") sql("LOAD DATA LOCAL INPATH ‘file:///usr/local/hadoop/spark/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/kv1.txt‘ INTO TABLE src") sql("SELECT * FROM src").show()
技術分享通過hive可以發下新增的src表
技術分享sql("SELECT COUNT(*) FROM src").show() val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
sqlDF .show() // The items in DaraFrames are of type Row, which allows you to access each column by ordinal. val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
技術分享// You can also use DataFrames to create temporary views within a SparkSession. val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i"))) recordsDF.createOrReplaceTempView("records") sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
技術分享-------------
備註:
1)將hive-site.xml拷貝到spark conf目錄下,所有節點都要有
2)將mysql驅動包分發到spark所有節點的jar目錄下
3.4.1 訪問不同版本的Hive Metastore
Spark SQL經常需要訪問Hive metastore,Spark SQL可以通過Hive metastore獲取Hive表的元數據。從Spark 1.4.0開始,Spark SQL只需簡單的配置,就支持各版本Hive metastore的訪問。註意,涉及到metastore時Spar SQL忽略了Hive的版本。Spark SQL內部將Hive反編譯至Hive 1.2.1版本,Spark SQL的內部操作(serdes, UDFs, UDAFs, etc)都調用Hive 1.2.1版本的class。版本配置項見下面表格:
技術分享3.5 JDBC連接其他庫

bin/spark-shell --driver-class-path jars/ojdbc14.jar --jars jars/ojdbc14.jar --master spark://master01:7077
----------------------
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "dataArrive").option("user", "maptest") .option("password", "maptest789").load()
jdbcDF.show()
技術分享//創建臨時視圖
jdbcDF.createOrReplaceTempView("dataArrvie")
//執行sql
val results = spark.sql("SELECT * FROM dataArrvie")
results.map(attributes => "JobaName: " + attributes(0)).show()
技術分享----------------------寫數據到mysql
import java.util.Properties

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://192.168.1.121:3306/sjh") .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "ApplyJob").option("user", "maptest") .option("password", "maptest789").load()
jdbcDF.createOrReplaceTempView("ApplyJob")
//執行sql
val results = spark.sql("SELECT * FROM ApplyJob")

results.write.mode("append").jdbc("jdbc:mysql://192.168.1.121:3306/sjh", "ApplyJob2", connectionProperties)
故障排除(Troubleshooting)
● 在客戶端session和所有的executors上,JDBC driver必須對啟動類加載器(primordial class loader)設置為visible。因為當創建一個connection時,Java的DriverManager類會執行安全驗證,安全驗證將忽略所有對啟動類加載器為非visible的driver。一個很方便的解決方法是,修改所有worker節點上的compute_classpath.sh腳本,將driver JARs添加至腳本。
● 有些數據庫(例:H2)將所有的名字轉換為大寫,所以在這些數據庫中,Spark SQL也需要將名字全部大寫。


4 性能調優
4.1 緩存數據至內存
Spark SQL可以通過調用spark.catalog.cacheTable("tableName") 或者dataFrame.cache(),將表用列式存儲格式( an in&shy;memory columnar format)緩存至內存中。然後Spark SQL在執行查詢任務時,只需掃描必需的列,從而以減少掃描數據量、提高性能。通過緩存數據,Spark SQL還可以自動調節壓縮,從而達到最小化內存使用率和降低GC壓力的目的。調用sqlContext.uncacheTable("tableName")可將緩存的數據移出內存。可通過兩種配置方式開啟緩存數據功能:
● 使用SparkSession的setConf方法
● 執行SQL命令 SET key=value
技術分享4.2 調優參數
可以通過配置下表中的參數調節Spark SQL的性能。在後續的Spark版本中將逐漸增強自動調優功能,下表中的參數在後續的版本中或許將不再需要配置。

技術分享


5 分布式SQL引擎
使用Spark SQL的JDBC/ODBC或者CLI,可以將Spark SQL作為一個分布式查詢引擎。終端用戶或應用不需要編寫額外的代碼,可以直接使用Spark SQL執行SQL查詢。
5.1 運行Thrift JDBC/ODBC服務
這裏運行的Thrift JDBC/ODBC服務與Hive 1.2.1中的HiveServer2一致。可以在Spark目錄下執行如下命令來啟動JDBC/ODBC服務,
命令:./sbin/start-thriftserver.sh
這個命令接收所有 bin/spark-submit 命令行參數,添加一個 --hiveconf 參數來指定Hive的屬性。詳細的參數說明請執行
命令: ./sbin/start-thriftserver.sh --help
服務默認監聽端口為localhost:10000。有兩種方式修改默認監聽端口:
● 修改環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \ ...
● 修改系統屬性
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri> ...

使用 beeline 來測試Thrift JDBC/ODBC服務:
./bin/beeline
連接到Thrift JDBC/ODBC服務
beeline> !connect jdbc:hive2://localhost:10000
在非安全模式下,只需要輸入機器上的一個用戶名即可,無需密碼。在安全模式下,beeline會要求輸入用戶名和密碼。安全模式下的詳細要求,請閱讀beeline documentation的說明。
配置Hive需要替換 conf/ 目錄下的 hive-site.xml。
Thrift JDBC服務也支持通過HTTP傳輸發送thrift RPC messages。開啟HTTP模式需要將下面的配參數配置到系統屬性或 conf/: 下的 hive-site.xml中
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice
測試http模式,可以使用beeline鏈接JDBC/ODBC服務:
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
5.2 運行Spark SQL CLI
Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。需要註意的是,Spark SQL CLI不能與Thrift JDBC服務交互。
在Spark目錄下執行如下命令啟動Spark SQL CLI:
./bin/spark-sql
配置Hive需要替換 conf/ 下的 hive-site.xml 。
執行 ./bin/spark-sql --help 可查看詳細的參數說明 。

6.參考資料
1)http://spark.apache.org/docs/latest/sql-programming-guide.html
Spark SQL官方網站
2) http://www.cnblogs.com/BYRans/p/5057110.html
3).http://www.cnblogs.com/BYRans/p/5057110.html
4).http://blog.csdn.net/yhao2014/article/details/52215966
5).http://www.tuicool.com/articles/yEZr6ve case class與普通class區別

【Spark深入學習 -16】官網學習SparkSQL