1. 程式人生 > >Spark實現之 好友推薦

Spark實現之 好友推薦

網上好多好推薦之類的部落格都是基於二度好友的推薦,下面介紹基於鄰接表的好友推薦的spark實現。

輸入:

    1 2,3,4,5,6,7,8  
    2 1,3,4,5,7  
    3 1,2  
    4 1,2,6  
    5 1,2  
    6 1,4  
    7 1,2  
    8 1  
第一列表示使用者,後面的表示使用者的所有好友。

需求:找出使用者好友的好友且與使用者不是好友的,推薦給使用者。

思路:1、key為user,value可設計成user有關聯關係的人及關聯關係的說明,例如:

                    key:"1"  value:<"2", "yes"> 表示"1"與"2"已經是好友關係

                    key:"2" value:<"3", "1">表示"2"與"3"共同的好友為"1"

            2、對reduce後的key的value做歸類,比如把key為"2"的value的第一個值為"3"的value歸為一組,這一組value中的第二個值若沒有"yes"出現,則把"3"推薦給"2",若有"yes"出現,說明"2"與"3"已為好友關係。

步驟:1、map:讀取每行資料,組合user與每一個好友,兩兩組合user的好友。

            2、reduce:分析value的第一個值是否時key好友,若不是則可以推薦給key。

package dabook

import org.apache.spark.{SparkConf, SparkContext}
import collection.mutable.{ HashMap, MultiMap, Set }

import scala.collection.mutable

object RecommFriend {

  def mymap(x:String): List[(String, (String, String))] ={
    var result = List[(String, (String, String))]()
    val arr = x.split(" ")
    if(arr.length != 2){
      return result
    }
    val host = arr(0).trim //主體
    val friends = arr(1).trim
    val arr_frinds = friends.split(",") //主體的列表

    for (s <- arr_frinds){
      result ::= (host, (s, "yes")) //yes表示已為好友關係
    }

    for(i <- Range(0, arr_frinds.length); j <- Range(i+1, arr_frinds.length)){
      result ::= (arr_frinds(i), (arr_frinds(j), host)) //好友列表兩兩分組,host是他們共同的好友
      result ::= (arr_frinds(j), (arr_frinds(i), host))
    }
    result
  }

  def myreduce(x:Iterable[(String, String)]): String ={
    var result = new String()

    var mm = new mutable.HashMap[String, Set[String]] with mutable.MultiMap[String, String]
    for(s <- x){
      mm.addBinding(s._1, s._2)
    }
    var sb = new mutable.StringBuilder()
    val ks = mm.keys
    for(k <- ks){
      var flag = true
      val values = mm.get(k)
      for (v <- values.get){
        if(v.equals("yes")){
          flag = false
        }
      }
      if(flag){
        sb.append("推薦好友:" + k + " 共同的好友:")
        sb.append(values.get.mkString(", "))
        sb.append(" | ")
      }
      if(sb.length > 2){
        result = sb.substring(0, sb.length-3)
      }
      else {
        result = sb.toString()
      }
    }
    result
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("recommfriend")
    val sc = new SparkContext(sparkConf)
    val input = sc.textFile("/home/xdk/file/friend")
    val res = input.flatMap(x=>mymap(x)).groupByKey().map(x=>(x._1, myreduce(x._2)))
    res.foreach(println)

    //test1()

  }

  def test(): Unit ={
    val ss = "1 2,3,4"
    val res = mymap(ss)
    println(res.size)
    println(res)
  }

  def test1(): Unit ={
    var mmm = new mutable.HashMap[String, Set[String]] with mutable.MultiMap[String, String]
    mmm.addBinding("123", "asd")
    mmm.addBinding("123", "dsad")
    mmm.addBinding("43345", "dasd")

    val keys1 = mmm.keys
    println(keys1)
    for(s <- keys1){
      println(s)
    }
  }

}

輸出:

(8,推薦好友:2 共同的好友:1 | 推薦好友:5 共同的好友:1 | 推薦好友:7 共同的好友:1 | 推薦好友:4 共同的好友:1 | 推薦好友:6 共同的好友:1 | 推薦好友:3 共同的好友:1)
(7,推薦好友:5 共同的好友:1, 2 | 推薦好友:8 共同的好友:1 | 推薦好友:4 共同的好友:1, 2 | 推薦好友:6 共同的好友:1 | 推薦好友:3 共同的好友:1, 2)
(4,推薦好友:5 共同的好友:1, 2 | 推薦好友:8 共同的好友:1 | 推薦好友:7 共同的好友:1, 2 | 推薦好友:3 共同的好友:1, 2)
(5,推薦好友:8 共同的好友:1 | 推薦好友:7 共同的好友:1, 2 | 推薦好友:4 共同的好友:1, 2 | 推薦好友:6 共同的好友:1 | 推薦好友:3 共同的好友:1, 2)
(6,推薦好友:2 共同的好友:4, 1 | 推薦好友:5 共同的好友:1 | 推薦好友:8 共同的好友:1 | 推薦好友:7 共同的好友:1 | 推薦好友:3 共同的好友:1)
(3,推薦好友:5 共同的好友:1, 2 | 推薦好友:8 共同的好友:1 | 推薦好友:7 共同的好友:1, 2 | 推薦好友:4 共同的好友:1, 2 | 推薦好友:6 共同的好友:1)
(2,推薦好友:8 共同的好友:1 | 推薦好友:6 共同的好友:4, 1)
(1,)