spark SQL學習(認識spark SQL)
阿新 • • 發佈:2019-01-10
spark SQL學習(認識spark SQL)
spark SQL初步認識
spark SQL是spark的一個模組,主要用於進行結構化資料的處理。它提供的最核心的程式設計抽象就是DataFrame。
DataFrame:它可以根據很多源進行構建,包括:結構化的資料檔案,hive中的表,外部的關係型資料庫,以及RDD
建立DataFrame
資料檔案students.json
{"id":1, "name":"leo", "age":18} {"id":2, "name":"jack", "age":19} {"id":3, "name":"marry", "age":17}
spark-shell裡建立DataFrame
//將檔案上傳到hdfs目錄下 [email protected]:~/wujiadong$ hadoop fs -put students.json /student/2016113012/spark //啟動spark shell [email protected]:~$ spark-shell //匯入SQLContext scala> import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext //宣告一個SQLContext的物件,以便對資料進行操作 scala> val sql = new SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sql: org.apache.spark.sql.SQLContext =
[email protected] //讀取資料 scala> val students = sql.read.json("hdfs://master:9000/student/2016113012/spark/students.json") students: org.apache.spark.sql.DataFrame = [age: bigint, id: bigint ... 1 more field] //顯示資料 scala> students.show +---+---+-----+ |age| id| name| +---+---+-----+ | 18| 1| leo| | 19| 2| jack| | 17| 3|marry| +---+---+-----+
DataFrame常用操作
scala> students.show
+---+---+-----+
|age| id| name|
+---+---+-----+
| 18| 1| leo|
| 19| 2| jack|
| 17| 3|marry|
+---+---+-----+
scala> students.printSchema
root
|-- age: long (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
scala> students.select("name").show
+-----+
| name|
+-----+
| leo|
| jack|
|marry|
+-----+
scala> students.select(students("name"),students("age")+1).show
+-----+---------+
| name|(age + 1)|
+-----+---------+
| leo| 19|
| jack| 20|
|marry| 18|
+-----+---------+
scala> students.filter(students("age")>18).show
+---+---+----+
|age| id|name|
+---+---+----+
| 19| 2|jack|
+---+---+----+
scala> students.groupBy("age").count().show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 17| 1|
| 18| 1|
+---+-----+
兩種方式將RDD轉換成DataFrame
1)基於反射方式
package wujiadong_sparkSQL
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/3/5.
*/
object RDDDataFrameReflection {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("rdddatafromareflection")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt")
val lineRDD = fileRDD.map(line => line.split(","))
//將RDD和case class關聯
val studentsRDD = lineRDD.map(x => Students(x(0).toInt,x(1),x(2).toInt))
//在scala中使用反射方式,進行rdd到dataframe的轉換,需要手動匯入一個隱式轉換
import sqlContext.implicits._
val studentsDF = studentsRDD.toDF()
//登錄檔
studentsDF.registerTempTable("t_students")
val df = sqlContext.sql("select * from t_students")
df.rdd.foreach(row => println(row(0)+","+row(1)+","+row(2)))
df.rdd.saveAsTextFile("hdfs://master:9000/student/2016113012/data/out")
}
}
//放到外面
case class Students(id:Int,name:String,age:Int)
執行結果
[email protected]:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameReflection --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
17/03/05 22:46:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 22:46:48 INFO Slf4jLogger: Slf4jLogger started
17/03/05 22:46:48 INFO Remoting: Starting remoting
17/03/05 22:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:34921]
17/03/05 22:46:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/03/05 22:46:51 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
17/03/05 22:47:00 INFO FileInputFormat: Total input paths to process : 1
17/03/05 22:47:07 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/03/05 22:47:07 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/03/05 22:47:07 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/03/05 22:47:07 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/03/05 22:47:07 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
1,leo,17
2,marry,17
3,jack,18
4,tom,19
17/03/05 22:47:10 INFO FileOutputCommitter: Saved output of task 'attempt_201703052247_0001_m_000000_1' to hdfs://master:9000/student/2016113012/data/out/_temporary/0/task_201703052247_0001_m_000000
2)程式設計介面方式
package wujiadong_sparkSQL
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/3/5.
*/
object RDDDataFrameBianchen {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDDataFrameBianchen")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//指定地址建立rdd
val studentsRDD = sc.textFile("hdfs://master:9000/student/2016113012/data/students.txt").map(_.split(","))
//將rdd對映到rowRDD
val RowRDD = studentsRDD.map(x => Row(x(0).toInt,x(1),x(2).toInt))
//以程式設計方式動態構造元素據
val schema = StructType(
List(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)
)
)
//將schema資訊對映到rowRDD
val studentsDF = sqlContext.createDataFrame(RowRDD,schema)
//登錄檔
studentsDF.registerTempTable("t_students")
val df = sqlContext.sql("select * from t_students order by age")
df.rdd.collect().foreach(row => println(row))
}
}
執行結果
[email protected]:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.RDDDataFrameBianchen --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
17/03/06 11:07:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/06 11:07:27 INFO Slf4jLogger: Slf4jLogger started
17/03/06 11:07:27 INFO Remoting: Starting remoting
17/03/06 11:07:28 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49756]
17/03/06 11:07:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
17/03/06 11:07:38 INFO FileInputFormat: Total input paths to process : 1
17/03/06 11:07:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/03/06 11:07:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/03/06 11:07:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/03/06 11:07:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/03/06 11:07:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
[1,leo,17]
[2,marry,17]
[3,jack,18]
[4,tom,19]
17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
17/03/06 11:07:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
DataFrame與RDD
1)在spark中,DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料庫中的二維表格
2)DataFrame與RDD的主要區別就是,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和型別
參考資料
http://9269309.blog.51cto.com/9259309/1851673
參考資料
http://blog.csdn.net/ronaldo4511/article/details/53406069
參考資料
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
分類: Spark
0
0
« 上一篇:spark SQL學習(案例-統計每日銷售)
» 下一篇:spark SQL學習(綜合案例-日誌分析)