Spark運算元:RDD建立的方式
阿新 • • 發佈:2018-12-30
建立RDD大體分為兩類方式:(1)通過集合建立;(2)通過外部儲存建立。
1、通過集合方式
(1)parallelize:def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
引數seq指Seq集合,numSlices指分割槽數。
scala> var rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21 scala> rdd.collect res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> rdd.partitions.size res4: Int = 15 //設定RDD為3個分割槽 scala> var rdd2 = sc.parallelize(1 to 10,3) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21 scala> rdd2.collect res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> rdd2.partitions.size res6: Int = 3
(2)makeRDD
1)def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
2)def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]
方法1)與parallelize方法一樣,方法2)可以指定定每一個分割槽的preferredLocations。
scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")), (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com"))) collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com))) scala> var rdd = sc.makeRDD(collect) rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23 scala> rdd.partitions.size res33: Int = 2 scala> rdd.preferredLocations(rdd.partitions(0)) res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com) scala> rdd.preferredLocations(rdd.partitions(1)) res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)
2、通過外部儲存建立
(1)HDFS上檔案格式:textFile
//從hdfs檔案建立 scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt") rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21 scala> rdd.count res48: Long = 4 //從本地檔案建立 scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml") rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21 scala> rdd.count res49: Long = 97
(2)HDFS上檔案格式:hadoopFile、sequenceFile、objectFile和newAPIHadoopFile
(3)Hadoop介面API建立:hadoopRDD、newAPIHadoopRDD
示例:從HBase建立RDD
scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HBaseAdmin
scala> val conf = HBaseConfiguration.create()
scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")
scala> var hbaseRDD = sc.newAPIHadoopRDD(
conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
scala> hbaseRDD.count
res52: Long = 1