Spark(五)資料讀取與儲存
目錄:
5、資料讀取與儲存
5.1、檔案格式
表5-1:Spark支援的一些常見格式
格式檔案 |
結構化 |
備註 |
文字檔案 |
否 |
普通的文字檔案,每行一條記錄 |
JSON |
半結構化 |
常見的基於文字的格式,通常在電子表格應用中使用一種用於鍵值對資料的常見Hadoop檔案格式 |
CSC |
是 |
非常常見的基於文字的格式,通常在電子表格應用各種使用 |
SequenceFiles |
是 |
一種用於鍵值對資料的常見Hadoop檔案格式 |
Protocol buffers |
是 |
一種快速、節約空間的跨語言格式 |
物件檔案 |
是 |
用來將Spark作業中的資料儲存下來以讓共享的程式碼讀取。改變類的時候它會失效,因為它依賴於Java序列化 |
5.1.1、文字檔案
在Spark中讀寫文字檔案很容易。當我們將一個文字檔案讀取為RDD時,輸入的一行都會成為RDD的一個元素。也可以將多個完成的文字檔案一次性讀取為一個pairRDD,其中鍵是檔名,值是檔案內容。
1、讀取文字檔案:
只需要使用檔案路徑作為引數呼叫SparkContext中的textFile()的函式,就可以讀取一個文字檔案。
在Scale中讀取一個文字檔案:
val input = sc.textFile(“README.MD”)
在Java中讀取一個文字檔案:
JavaRDD<String> input = sc.textFile(“README.ME”);
如果多個輸入檔案以一個包含資料所有部分的目錄的形式出現,可以用兩種方式來處理。可以仍使用textFile函式,傳遞目錄作為函式,這樣它會把各部分都讀取到RDD中。有時候有必要知道資料的各部分分別來自哪個檔案(比如將鍵放在檔名中的時間資料),有時候則希望同時處理整個檔案。如果檔案足夠小,那麼可以使用SparkContext.wholeTextFiles()方法,該方法會返回一個pairRDD,其中鍵是輸入檔案的檔名。
2、儲存文字檔案:
saveAsTextFile()方法接收一個路徑,並將RDD中的內容都輸入到路徑對應的檔案中。Spark將傳入的路徑作為目錄對待,會在那個目錄下輸出多個檔案。
5.1.2、JSON
1、在Java中讀取JSON:
2、儲存JSON
5.1.3、逗號分隔值與製表符分隔值
1、讀取CSV:
如果恰好你的CSV的所有資料欄位均沒有包含換行符,可以直接使用textFile()讀取並解析資料。
在Java中使用textFile()讀取CSV:
如果在欄位中嵌有換行符,就需要完整讀入每個檔案,然後解析各段。
在Java中完整讀取CSV:
2、儲存CSV:
呼叫saveAsTextFile(outFile)方法。
5.1.4、SequenceFile
SequenceFile是由沒有相對關係結構的鍵值對檔案組成的常用Hadoop格式。SequenceFile檔案有同步標記,Spark可以用來定位到檔案中某個點,然後再與記錄的邊界對其。
由於 Hadoop 使用了一套自定義的序列化框架,因此 SequenceFile 是由實現 Hadoop 的 Writable
介面的元素組成。
表5-2:HadoopWritable型別對應表:
1、讀取SequenceFile:
2、在Java中儲存SequenceFile
在Java 中儲存SequenceFile要稍微複雜一些,因為JavaPairRDD上沒有saveAsSequenceFile()方法。我們要使用Spark儲存自定義Hadoop格式的功能來實現。
5.1.5、物件檔案
物件檔案看起來就像是對 SequenceFile 的簡單封裝,它允許儲存只包含值的 RDD。和SequenceFile 不一樣的是,物件檔案是使用 Java 序列化寫出的。
要儲存物件檔案,只需在 RDD 上呼叫 saveAsObjectFile() 就行了。讀回物件檔案也相當簡單:用 SparkContext 中的 objectFile() 函式接收一個路徑,返回對應的 RDD。
5.2、檔案系統
5.2.1、本地/“常規”檔案系統
Spark支援從本地系統檔案中讀取檔案,不夠它要求檔案在叢集中所有節點的相同路徑下都可以找到。一些像 NFS、AFS 以及 MapR 的 NFS layer 這樣的網路檔案系統會把檔案以常規檔案系統的形式暴露給使用者。如果你的資料已經在這些系統中,那麼你只需要指定輸入為一個 file://路徑;只要這個檔案系統掛載在每個節點的同一個路徑下,Spark 就會自動處理。
如果檔案還沒有放在叢集中的所有節點上,你可以在驅動器程式中從本地讀取該檔案而無需使用整個叢集,然後再呼叫 parallelize 將內容分發給工作節點。不過這種方式可能會比較慢,所以推薦的方法是將檔案先放到像 HDFS、NFS、S3 等共享檔案系統上。
5.2.3、HDFS
在 Spark 中使用 HDFS 只需要將輸入輸出路徑指定為 hdfs://master:port/path 就夠了。