1. 程式人生 > >spark大矩陣計算

spark大矩陣計算

輸入:

M 1 1 1
M 1 3 5
M 2 2 7
M 3 1 6
M 3 3 9
M 4 1 2
M 4 2 10
N 1 1 1
N 1 3 3
N 1 5 5
N 2 2 6
N 2 3 9
N 2 4 8
N 2 5 7
N 3 2 10 
N 3 4 12 

程式碼:

package simrank

import org.apache.spark.sql.SparkSession

object simrankMaptest {

  def test(spark: SparkSession): Unit ={
    val path = "./sinrankmap.txt"
    val mats = spark.sparkContext.textFile(path)
    val firstMat = mats.filter(line =>line.contains("M"))
    val secondMat = mats.filter(line =>line.contains("N"))

    val firstItems = firstMat.map(line =>{
      val lineSplit = line.split(" ")
      (lineSplit(2),(lineSplit(0),lineSplit(1),lineSplit(3)))
    })

    val secondItems = secondMat.map(line =>{
      val lineSplit = line.split(" ")
      (lineSplit(1),(lineSplit(0),lineSplit(2),lineSplit(3)))
    })

    val newItems = firstItems.join(secondItems).values.map(v=>{
      (v._1._2 + " " + v._2._2,v._1._3.toFloat * v._2._3.toFloat)
    })

    val result = newItems.reduceByKey((x,y)=>x+y)
    result.collect().foreach(v=>print(v+"\n"))



  }

}

輸出:

(1 2,50.0)
(3 1,6.0)
(3 3,18.0)
(4 1,2.0)
(3 4,108.0)
(3 2,90.0)
(1 4,60.0)
(4 5,80.0)
(1 5,5.0)
(4 3,96.0)
(2 2,42.0)
(3 5,30.0)
(2 5,49.0)
(2 4,56.0)
(4 4,80.0)
(2 3,63.0)
(1 1,1.0)
(1 3,3.0)
(4 2,60.0)