[Spark]-RDD之創建
1.RDD的創建
1.1 從一個本地的Scala集合創建
//聲明一個本地集合 val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) /** *分布式數據集,有一個重要參數就是數據分片數量(Spark會在每一個分片跑一個task) *本地集合創建,默認情況,Spark會根據你的集群數量自動設置分片數 *也可以手動指定這個數據集的分片(第二個參數) */ //val distData = sc.parallelize(data, 10) //一旦分布式數據集創建完畢,這個數據集就可以並行的被操作distData.reduce((a, b) => a + b)
1.2 從一個外部的存儲系統中創建
這裏外部系統,指的是任何Hadoop(InputFormat)支持的存儲系統.比如本地文本文件,HDFS,HBase,S3等等
1.2.1 textFile
val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/data.txt") /** *這裏純粹的本地文件是不推薦的 *因為這個文件訪問是針對每一個Worker都要是能訪問的 * 換言之,如果是本地文件,則必須保證每一個Worker的本地都有一份這個文件*/ //val distFile = sc.textFile("/data.txt") /** *Spark支持文件目錄,壓縮文件,或者通配符等 */ //val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/*.gz") /** *對於外部文件,Spark會按照128M(HDFS默認),來進行分區 *這裏依然可以手動設置分區數.但要註意的是手動設置的分區數必須要大於默認分區數 * 即只允許分的更小,但不能分得更大*/ //val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/*.txt",10) distFile.map(s => s.length).reduce((a, b) => a + b)
1.2.2 wholeTextFiles
wholeTextFiles是用來讀取某個文件目錄下的多個小文件的.
與textFile的區別是,
textFile 以行斷符為分割.一個記錄就是一行
wholeTextFiles 是以文件為分割,一個記錄就是一個文件內的全部內容
wholeTextFiles的默認情況,可能導致分區數太小.這時可以手動設置調高分區數
1.2.3 sequenceFile[K, V]
將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。
該操作只支持對實現了Hadoop的Writable
接口的鍵值對RDD進行操作。
在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)
1.2.4 hadoopRDD
對於其它的Hadoop InputFormats,可以hadoopRDD讀取.
傳入JobConf,input format class,key class and value class(與MapReduce任務設置相同),就可以直接以MapReduce作為輸入源進行讀取
newAPIHadoopRDD
1.2.5 saveAsObjectFile
將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可以使用SparkContext.objectFile()進行加載
[Spark]-RDD之創建