spark RDD運算元(十一)之RDD Action 儲存操作saveAsTextFile,saveAsSequenceFile,saveAsObjectFile,saveAsHadoopFile 等
關鍵字:Spark運算元、Spark函式、Spark RDD行動Action、Spark RDD儲存操作、saveAsTextFile、saveAsSequenceFile、saveAsObjectFile,saveAsHadoopFile、saveAsHadoopDataset,saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
saveAsTextFile
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile用於將RDD以文字檔案的格式儲存到檔案系統中。
codec引數可以指定壓縮的類名。
var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //儲存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
hadoop fs -cat /tmp/lxw1234.com/part-00000
注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)將檔案儲存到本地檔案系統,那麼只會儲存在Executor所在機器的本地目錄。
指定壓縮格式儲存
rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])
hadoop fs -ls /tmp/lxw1234.com
-rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
hadoop fs -text /tmp/lxw1234.com/part-00000.lzo
saveAsSequenceFile
saveAsSequenceFile用於將RDD以SequenceFile的檔案格式儲存到HDFS上。
用法同saveAsTextFile。
saveAsObjectFile
def saveAsObjectFile(path: String): Unit
saveAsObjectFile用於將RDD中的元素序列化成物件,儲存到檔案中。
對於HDFS,預設採用SequenceFile儲存。
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
saveAsHadoopFile
def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], codec: Class[_ <: CompressionCodec]): Unit
def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
saveAsHadoopFile是將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API。
可以指定outputKeyClass、outputValueClass以及壓縮格式。
每個分割槽輸出一個檔案。
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])
saveAsHadoopDataset
def saveAsHadoopDataset(conf: JobConf): Unit
saveAsHadoopDataset用於將RDD儲存到除了HDFS的其他儲存中,比如HBase。
在JobConf中,通常需要關注或者設定五個引數:
檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數。
##使用saveAsHadoopDataset將RDD儲存到HDFS中
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)
結果:
hadoop fs -cat /tmp/lxw1234/part-00000
A 2
A 1
hadoop fs -cat /tmp/lxw1234/part-00001
B 6
B 3
B 7
##儲存資料到HBASE
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
jobConf.set("zookeeper.znode.parent","/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
jobConf.setOutputFormat(classOf[TableOutputFormat])
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x =>
{
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsHadoopDataset(jobConf)
##結果:
hbase(main):005:0> scan 'lxw1234'
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds
注意:儲存到HBase,執行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。
saveAsNewAPIHadoopFile
def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
def saveAsNewAPIHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile用於將RDD資料儲存到HDFS上,使用新版本Hadoop API。
用法基本同saveAsHadoopFile。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
saveAsNewAPIHadoopDataset
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
作用同saveAsHadoopDataset,只不過採用新版本Hadoop API。
以寫入HBase為例:
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
完整的Spark應用程式:
package com.lxw1234.test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
object Test {
def main(args : Array[String]) {
val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
val sc = new SparkContext(sparkConf);
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd1.map(
x => {
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
注意:儲存到HBase,執行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。