spark 倒排索引
阿新 • • 發佈:2018-12-22
1.例項描述
輸入為一批檔案,檔案內容格式如下:
Id1 The Spark
……
Id2 The Hadoop
……
輸出如下:(單詞,文件ID合併字串)
The Id1 Id2
Hadoop Id2
……
2.設計思路
先讀取所有檔案,資料項為(文件ID,文件詞集合)的RDD,然後將資料對映為(詞,文件ID)的RDD,去重,最後在reduceByKey階段聚合每個單詞的文件ID
3.程式碼
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import scala.collection.mutable
object InvertedIndex {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("InvertedIndex").setMaster("local[1]")
val sc = new SparkContext(conf)
val textRdd=sc.textFile("hdfs://master:9000/wordIndex")
val md=textRdd.map(file=>file.split("\t"))
val md2=md.map(item=>{(item(0),item(1))})
val fd=md2.flatMap(file =>{
val words=file._2.split(" ").iterator
val list=mutable.LinkedList[(String,String)]((words.next(),file._1))
var temp=list
while(words.hasNext){
temp.next=mutable.LinkedList[(String,String)]((words.next,file._1))
temp=temp.next
}
list
})
val result=fd.distinct()
val resRdd=result.reduceByKey(_+" "+_)
resRdd.saveAsTextFile("hdfs://master:9000/InvertIndex")
}
}
4.說明
其中有如下幾點要注意
rdd flatMap方法定義如下
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
方法的引數為函式,函式輸出型別為集合(的父類)。它的作用是將這些集合合併為一個新的集合,但不刪除相同的元素,也不合並rdd中的分割槽。
reduce 方法定義如下
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
reduce 函式相當於對RDD中的元素進行reduceLeft函式的操作。reduceLeft先對兩個元素<K,V>進行reduce函式操作,然後將結果和迭代器取出的下一個元素<K,V>進行reduce函式操作,直到迭代器遍歷完所有元素,得到最後結果。
在RDD中,先對每個分割槽中的所有元素<K,V>的集合分別進行reduceLeft。每個分割槽形成的結果相當於一個元素<K,V>,再對這個結果集合進行reduceLeft操作。
例如:使用者自定義函式如下。
f:(A,B)=>(A._1+"@"+B._1 , A._2+B._2)
如圖: