1. 程式人生 > >spark 倒排索引

spark 倒排索引

1.例項描述 輸入為一批檔案,檔案內容格式如下: Id1 The Spark …… Id2 The Hadoop …… 輸出如下:(單詞,文件ID合併字串) The    Id1 Id2 Hadoop    Id2 …… 2.設計思路 先讀取所有檔案,資料項為(文件ID,文件詞集合)的RDD,然後將資料對映為(詞,文件ID)的RDD,去重,最後在reduceByKey階段聚合每個單詞的文件ID 3.程式碼 import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import scala.collection.mutable object InvertedIndex {   def main(args: Array[String]) {     val conf = new SparkConf().setAppName("InvertedIndex").setMaster("local[1]")     val sc = new SparkContext(conf)     val textRdd=sc.textFile("hdfs://master:9000/wordIndex")     val md=textRdd.map(file=>file.split("\t"))     val md2=md.map(item=>{(item(0),item(1))})     val fd=md2.flatMap(file =>{       val words=file._2.split(" ").iterator       val list=mutable.LinkedList[(String,String)]((words.next(),file._1))       var temp=list       while(words.hasNext){         temp.next=mutable.LinkedList[(String,String)]((words.next,file._1))         temp=temp.next       }       list     })     val result=fd.distinct()     val resRdd=result.reduceByKey(_+" "+_)
    resRdd.saveAsTextFile("hdfs://master:9000/InvertIndex")   } } 4.說明 其中有如下幾點要注意 rdd flatMap方法定義如下 /**    *  Return a new RDD by first applying a function to all elements of this    *  RDD, and then flattening the results.    */   def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =     new FlatMappedRDD(this, sc.clean(f)) 方法的引數為函式,函式輸出型別為集合(的父類)。它的作用是將這些集合合併為一個新的集合,但不刪除相同的元素,也不合並rdd中的分割槽。 reduce 方法定義如下 /**    * Reduces the elements of this RDD using the specified commutative and    * associative binary operator.    */   def reduce(f: (T, T) => T): T = {     val cleanF = sc.clean(f)     val reducePartition: Iterator[T] => Option[T] = iter => {       if (iter.hasNext) {         Some(iter.reduceLeft(cleanF))
      } else {         None       }     }     var jobResult: Option[T] = None     val mergeResult = (index: Int, taskResult: Option[T]) => {       if (taskResult.isDefined) {         jobResult = jobResult match {           case Some(value) => Some(f(value, taskResult.get))           case None => taskResult         }       }     }     sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty     jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))   } reduce 函式相當於對RDD中的元素進行reduceLeft函式的操作。reduceLeft先對兩個元素<K,V>進行reduce函式操作,然後將結果和迭代器取出的下一個元素<K,V>進行reduce函式操作,直到迭代器遍歷完所有元素,得到最後結果。 在RDD中,先對每個分割槽中的所有元素<K,V>的集合分別進行reduceLeft。每個分割槽形成的結果相當於一個元素<K,V>,再對這個結果集合進行reduceLeft操作。 例如:使用者自定義函式如下。 f:(A,B)=>(A._1+"@"+B._1 , A._2+B._2) 如圖: