1. 程式人生 > 實用技巧 >Spark專案實戰從0到1之(1)Spark讀取和儲存HDFS上的資料

Spark專案實戰從0到1之(1)Spark讀取和儲存HDFS上的資料

本篇來介紹一下通過Spark來讀取和HDFS上的資料,主要包含四方面的內容:將RDD寫入HDFS、讀取HDFS上的檔案、將HDFS上的檔案新增到Driver、判斷HDFS上檔案路徑是否存在。

1、啟動Hadoop

首先啟動咱們的Hadoop,在hadoop的目錄下執行下面的命令:

rm -rf tmp 
mkdir tmp
cd sbin
hadoop namenode -format
start-dfs.sh
start-yarn.sh

檢視是否啟動成功:

2、將RDD寫入HDFS

先建立一個SparkSession:

val spark = SparkSession
      .
builder() .appName("Spark SQL basic example") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN")

將RDD寫入HDFS使用的函式是saveAsTextFile:

val modelNames = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNamesRdd = spark.sparkContext.parallelize(modelNames,
1) modelNamesRdd.saveAsTextFile("hdfs://localhost:9000/user/root/modelNames")

接下來,我們檢視一下是否儲存成功:

可以看到RDD在HDFS上是分塊儲存的,由於我們只有一個分割槽,所以只有part-0000。假設我們儲存一個包含兩個分割槽的RDD:

val modelNames3 = Array("FM","FFM","DEEPFM","NFM","DIN","DIEN")
val modelNames3Rdd = spark.sparkContext.parallelize(modelNames3,2)

modelNames3Rdd.
saveAsTextFile("hdfs://localhost:9000/user/root/modelNames3")

再次檢視,可以看到有part-00000和part-00001:

3、讀取HDFS上的檔案

讀取HDFS上的檔案,使用textFile方法:

 val modelNames2 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames/part-00000")

val modelNames4 = spark.sparkContext.textFile("hdfs://localhost:9000/user/root/modelNames3/")

讀取時是否加最後的part-00000都是可以的,當只想讀取某個part,則必須加上。

4、將HDFS上的檔案新增到Driver

有時候,我們並不想直接讀取HDFS上的檔案,而是想對應的檔案新增到Driver上,然後使用java或者Scala的I/O方法進行讀取,此時使用addFile和get方法來實現:

val files = "hdfs://localhost:9000/user/root/modelNames/part-00000"
spark.sparkContext.addFile(files)
val path = SparkFiles.get("part-00000")
println(path)

列印的路徑十分奇怪,沒有擷取完全:

然後有了path之後,就可以使用scala的I/O進行讀取:

val source = Source.fromFile(path)
val lineIterator = source.getLines
val lines =lineIterator.toArray
println(lines.mkString(","))

輸出為:

FM,FFM,DEEPFM,NFM,DIN,DIEN

5、判斷HDFS上檔案路徑是否存在

在讀取HDFS地址或者將檔案傳輸到Driver上的時候,首先需要判斷檔案是否存在。單機環境下,程式碼如下:

val conf = spark.sparkContext.hadoopConfiguration

val path = new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000")
val fs = path.getFileSystem(conf) //得hdfs檔案系統中的路徑資訊

val modelNamesExists = fs.exists(path)
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))

println(modelNamesExists)
println(modelNames1Exists)

輸出結果為:

true
false

而在公司中的大規模叢集環境下,通常的程式碼如下:

val conf = spark.sparkContext.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)

val modelNamesExists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames/part-00000"))
val modelNames1Exists = fs.exists(new org.apache.hadoop.fs.Path("hdfs://localhost:9000/user/root/modelNames1/part-00000"))

println(modelNamesExists)
println(modelNames1Exists)

如果在本地單機環境下仍然使用上面的程式碼,會報如下的錯誤:

Wrong FS: hdfs://localhost:9000/user/root/modelNames/part-00000, expected: file:///

所以對比兩份程式碼你可以發現,在本地環境中,我們首先使用getFileSystem獲取了hdfs檔案系統中的路徑資訊,從而避免了上面的錯誤。

好了,今天的知識就分享到這裡,小夥伴們都掌握了麼?