Spark(十)【RDD的讀取和儲存】
阿新 • • 發佈:2020-08-06
目錄
一.檔案型別
1.Text檔案
讀寫
讀取 scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt") hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24 儲存 scala> hdfsFile.saveAsTextFile("/fruitOut")
2.Json檔案
使用RDD讀取JSON檔案處理很複雜,同時SparkSQL集成了很好的處理JSON檔案的方式,所以應用中多是採用SparkSQL處理JSON檔案。
(1)匯入解析json所需的包 scala> import scala.util.parsing.json.JSON (2)上傳json檔案到HDFS [atguigu@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json / (3)讀取檔案 scala> val json = sc.textFile("/people.json") json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24 (4)解析json資料 scala> val result = json.map(JSON.parseFull) result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
3.物件檔案
物件檔案是將物件序列化後儲存的檔案,採用Java的序列化機制。可以通過objectFilek,v 函式接收一個路徑,讀取物件檔案,返回對應的 RDD,也可以通過呼叫saveAsObjectFile() 實現對物件檔案的輸出。因為是序列化所以要指定型別
讀寫
(1)建立一個RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24 (2)將RDD儲存為Object檔案 scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile") (3)檢視該檔案 [hadoop@hadoop102 objectFile]$ pwd /opt/module/spark/objectFile [hadoop@hadoop102 objectFile]$ ll 總用量 8 -rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00000 -rw-r--r-- 1 atguigu atguigu 142 10月 9 10:37 part-00001 -rw-r--r-- 1 atguigu atguigu 0 10月 9 10:37 _SUCCESS [hadoop@hadoop102 objectFile]$ cat part-00000 SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableW@`l (4)讀取Object檔案 scala> val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile") objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24 (5)列印讀取後的Sequence檔案 scala> objFile.collect res19: Array[Int] = Array(1, 2, 3, 4)
4.Sequence檔案
很少用了。。
注意:SequenceFile檔案只針對PairRDD
(1)建立一個RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)將RDD儲存為Sequence檔案
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)檢視該檔案
[hadoop@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile
[hadoop@hadoop102 seqFile]$ ll
總用量 8
-rw-r--r-- 1 atguigu atguigu 108 10月 9 10:29 part-00000
-rw-r--r-- 1 atguigu atguigu 124 10月 9 10:29 part-00001
-rw-r--r-- 1 atguigu atguigu 0 10月 9 10:29 _SUCCESS
[hadoop@hadoop102 seqFile]$ cat part-00000
SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)讀取Sequence檔案
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)列印讀取後的Sequence檔案
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
二.檔案系統
1. MySQL
依賴
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
讀取
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: 從Mysql讀取資料
* @author: HaoWu
* @create: 2020年08月05日
*/
object MySqlReadWriteTest {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(
sc,
() => {
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
},
"select * from project_files where project_id >= ? and project_id <= ?;",
1,
4,
1,
//返回值是個陣列,已經將JDBC返回的結果處理過。
r => (r.getInt(1), r.getString(2)))
println(rdd.count())
rdd.foreach(println(_))
sc.stop()
}
}
儲存
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: 向Mysql中插入資料
* @author: HaoWu
* @create: 2020年08月05日
*/
object MySqlReadWriteTest {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("JdbcApp")
val sc: SparkContext = new SparkContext(sparkConf)
val list = List((1, 9), (1, 10))
val rdd: RDD[(Int, Int)] = sc.makeRDD(list)
//使用foreachPartition效率更高,批量,不用頻繁建立mysql連線
rdd.foreachPartition(iter => {
// 建立Connection
val con: Connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/azkaban", "root", "root")
//準備sql
val sql="insert into project_files(project_id,version) values(?,?)"
//PreapredStatement
val ps: PreparedStatement = con.prepareStatement(sql)
//將批量資料依次插入
iter.foreach{
case(project_id,version) => {
//插入int型別
ps.setInt(1,project_id)
ps.setInt(2,version)
//執行sql
ps.executeUpdate()
}
}
ps.close()
con.close()
})
sc.stop()
}
}
2. Hbase
依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.0</version>
</dependency>
將hbase的配置檔案hbase-site.xml,放到resource目錄,保留連線zookeeper
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
</property>
讀取
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: Hbase的讀取
* @author: HaoWu
* @create: 2020年08月05日
*/
object HbaseReadWriterTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
// 建立連線 預設讀取hadoop中的配置檔案,和hbase中的配置檔案 預設使用的還是TextInputFormat
val conf: Configuration = HBaseConfiguration.create()
// 設定當前要讀取哪個表
conf.set(TableInputFormat.INPUT_TABLE, "bigdata:user")
//核心建立RDD
val rdd = new NewHadoopRDD[ImmutableBytesWritable, Result](sc,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result],
conf)
rdd.foreach {
case (rowKey, result) => {
// CellUtil : 取出Cell某個屬性 Bytes: 將Java中的資料型別 和byte[]互轉
// 獲取一條資料的所有cell
val cells: Array[Cell] = result.rawCells()
for (cell <- cells) {
println(Bytes.toString(CellUtil.cloneRow(cell)) + " " +
Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " " +
Bytes.toString(CellUtil.cloneValue(cell)))
}
}
}
}
}
寫入
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd. RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @description: Hbase的儲存
* @author: HaoWu
* @create: 2020年08月05日
*/
object HbaseReadWriterTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
// 建立連線 預設讀取hadoop中的配置檔案,和hbase中的配置檔案 預設使用的還是TextInputFormat
val conf: Configuration = HBaseConfiguration.create()
// 設定當前要寫出到哪個表
conf.set(TableOutputFormat.OUTPUT_TABLE, "bigdata:user")
//在Conf中設定各種引數
val job: Job = Job.getInstance(conf)
//設定輸出格式
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
// 設定輸出的key,value的型別
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
// 用list封裝資料(rowkey,(列簇,列,值))
val list = List(("1005", ("info2", "age", "20")), ("1005",( "info2", "name", "marry")), ("1006", ("info2", "age", "21")))
val rdd: RDD[(String, (String, String, String))] = sc.makeRDD(list, 2)
// 使用spark將資料封裝為輸出的key-value型別
val rdd2: RDD[(ImmutableBytesWritable, Put)] = rdd.map {
case (rowkey, (cf, cq, v)) => {
//封裝rowkey
val key = new ImmutableBytesWritable()
key.set(Bytes.toBytes(rowkey))
//封裝put
val value = new Put(Bytes.toBytes(rowkey))
value.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(v))
(key, value)
}
}
//之前設定的配置傳入
rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
踩的坑
在跑讀取hbase資料的時候發現程式報錯:
原因:pom的hbase依賴包必須放置spark-core包後面,不然就報這個錯誤。
java.lang.ExceptionInInitializerError
at org.apache.spark.SparkContext.withScope(SparkContext.scala:751)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:882)
at com.spark.rdd.RDDTest.testMap(RDDTest.scala:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
... 27 more