Spark 2.4.0程式設計指南--spark dataSet action
阿新 • • 發佈:2018-12-26
Spark 2.4.0程式設計指南–spark dataSet action
更多資源
視訊
- Spark 2.4.0程式設計指南–spark dataSet action(bilibili視訊) : https://www.bilibili.com/video/av38193405/?p=3
文件
前置條件
- 已安裝好java(選用的是java 1.8.0_191)
- 已安裝好scala(選用的是scala 2.11.121)
- 已安裝好hadoop(選用的是Hadoop 3.1.1)
- 已安裝好spark(選用的是spark 2.4.0)
技能標籤
- Spark session 建立
- 在Spark 2.0之後,RDD被資料集(Dataset)取代 ,保留RDD舊api
- 資料集資料集介紹
- 讀取本地檔案(txt,json),HDFS檔案
- 對txt格式檔案資料遍歷(行資料轉成物件)
- 對json格式檔案資料遍歷(直接轉物件)
- 資料集的action操作
- collect,collectAsList,count,describe,first,foreach,head,reduce,show,take,takeAsList,toLocalIterator
- 官網: http://spark.apache.org/docs/2.4.0/sql-getting-started.html
DataSet(資料集)
資料集是分散式資料集合。資料集是Spark 1.6中新增的一個新介面,它提供了RDD的優勢(強型別,使用強大的lambda函式的能力)以及Spark SQL優化執行引擎的優點。資料集可以從JVM物件構造,然後使用功能轉換(map,flatMap,filter等)進行操作。資料集API在Scala和Java中可用。 Python沒有對Dataset API的支援。但由於Python的動態特性,資料集API的許多好處已經可用(即您可以通過名稱自然地訪問行的欄位row.columnName)。 R的情況類似。
BaseSparkSession
- 公用得到SparkSession的方法
def sparkSession(isLocal:Boolean = false): SparkSession = {
if(isLocal){
master = "local"
val spark = SparkSession.builder
.master(master)
.appName(appName)
.getOrCreate()
//spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
//import spark.implicits._
spark
}else{
val spark = SparkSession.builder
.master(master)
.appName(appName)
.config("spark.eventLog.enabled","true")
.config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
.config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
.getOrCreate()
// spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
//import spark.implicits._
spark
}
}
textFile
- 讀取本地檔案
val spark = sparkSession(true)
//返回dataFrame
val df = spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
df.show()
// +-----------+
// | value|
// +-----------+
// |Michael, 29|
// | Andy, 30|
// | Justin, 19|
// | Think, 30|
// +-----------+
textFile
- 讀取HDFS檔案
val spark = sparkSession(true)
//返回dataFrame
val df = spark.read.textFile("hdfs://standalone.com:9000/home/liuwen/data/people.txt")
df.show()
// +-----------+
// | value|
// +-----------+
// |Michael, 29|
// | Andy, 30|
// | Justin, 19|
// | Think, 30|
// +-----------+
spark.stop()
text
- 讀取本地檔案
val spark = sparkSession(true)
//返回dataFrame
val df = spark.read.text("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
df.show()
// +-----------+
// | value|
// +-----------+
// |Michael, 29|
// | Andy, 30|
// | Justin, 19|
// | Think, 30|
// +-----------+
text
- 讀取HDFS資料
object Run extends BaseSparkSession{
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
//返回dataFrame
val df = spark.read.text("hdfs://standalone.com:9000/home/liuwen/data/people.txt")
df.show()
// +-----------+
// | value|
// +-----------+
// |Michael, 29|
// | Andy, 30|
// | Justin, 19|
// | Think, 30|
// +-----------+
spark.stop()
}
}
foreach 遍歷檔案內容
- 物件遍歷
object Run1 extends BaseSparkSession{
case class Person(name: String, age: Long)
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
import spark.implicits._
spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
.map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong))
.foreach( person => println(s"name:${person.name}\t age:${person.age}"))
spark.stop()
}
}
first
- 得到dataSet的第一個元素
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
println(dataSet.first()) //first裡邊呼叫的是head()
spark.stop()
head
- 得到dataSet的第一個元素
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
println(dataSet.head()) //first裡邊呼叫的是head()
head n
- 得到dataSet的前n個元素
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
println(dataSet.head(5)) //first裡邊呼叫的是head()
count
- 得到dataSet 一共有多少行資料
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
println(dataSet.count())
collect
- 收集dataSet中所有行的資料,在本地輸出
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
println(dataSet.collect().mkString("\n"))
collectAsList
- 收集dataSet中所有的資料,轉成java.util.List物件
val spark = sparkSession(true)
val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
println( dataSet.collectAsList())
import scala.collection.JavaConversions._
for( v <- dataSet.collectAsList()) println(v)
spark.stop()
foreache
- 遍歷dataSet中的每一行資料
val spark = sparkSession(true)
val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
dataSet.foreach(println(_))
foreache class
- 以物件形式遍歷dataSet中所有的資料
object Run1 extends BaseSparkSession{
case class Person(name: String, age: Long)
def main(args: Array[String]): Unit = {
val spark = sparkSession(true)
import spark.implicits._
spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
.map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong))
.foreach( person => println(s"name:${person.name}\t age:${person.age}"))
spark.stop()
}
}
map
- 遍歷資料集中的每一個元素,進行map函式操作
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
import spark.implicits._
val lineWordLength = dataSet.map( line => line.split(" ").size)
println(lineWordLength.collect().mkString("\n"))
reduce
- 遍歷dataSet中的元素,每兩兩進行reduce函式操作
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/word.text")
/**
* 統計所有行單詞個數
*/
import spark.implicits._
val lineWordLength = dataSet.map( line => line.split(" ").size)
val result = lineWordLength.reduce((a,b) => a + b)
println(result)
show
- 以表格形式顯示dataSet資料,預設顯示前20行資料
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text")
val result = dataSet.show()
println(result)
show n
- 以表格形式顯示dataSet資料,預設顯示前20行資料
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text")
/**
* 以表格的形式顯示前3行資料
* numRows是顯示前幾行的資料
*/
val result = dataSet.show(3)
println(result)
show truncate
- 以表格形式顯示dataSet資料,預設顯示前20行資料
- 引數truncate=false,是不截斷顯示所有資料,true是進截斷
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/word.big.text")
/**
* 以表格的形式顯示前3行資料
* numRows是顯示前幾行的資料
* false 不進行返回行資料截斷
*/
val result = dataSet.show(10,false)
println(result)
take
- take 是相當於head
val spark = sparkSession()
val dataSet = spark.read.textFile("/home/liuwen/data/word.big.txt")
val result = dataSet.take(10) //等於head(n)
println(result.mkString("\n"))
describe
val spark = sparkSession()
val dataSet = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/json/people.json")
dataSet.describe("name","age").show()
// +-------+-------+------------------+
// |summary| name| age|
// +-------+-------+------------------+
// | count| 3| 2|
// | mean| null| 24.5|
// | stddev| null|7.7781745930520225|
// | min| Andy| 19|
// | max|Michael| 30|
// +-------+-------+------------------+
end