1. 程式人生 > 其它 >2-電影推薦案例學習

2-電影推薦案例學習

學習程式碼思路及風格、學習整體架構思路,不可全部照搬。

案例來源

  • 尚矽谷推薦系統專案案例

整體架構

  • 概述
    • 統計模組,召回歷史最熱、最近最熱、平均評分最高、每類別評分Top10
    • 離線推薦,基於ALS召回與使用者最相近、與電影最相近的TopN電影
    • 實時推薦,基於離線推薦計算的電影相似度矩陣,結合使用者最近K次評分,計算當前評分電影的某個相似電影與最近K次評分電影的平均相似得分,混合增強減弱因子,獲得當前評分電影的相似電影序列的排序結果
    • 內容推薦,基於TF-IDF計算電影之間的相似度,獲取電影相似度矩陣,召回邏輯未實現
    • 問題:沒有過濾模組,沒有混排模組(視具體場景而定)

程式碼結構及pom檔案配置

  • 程式碼結構
  • 推薦工程pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lotuslaw</groupId>
    <artifactId>MovieRecommendSystem</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>recommender</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <log4j.version>1.2.17</log4j.version>
        <slf4j.version>1.7.22</slf4j.version>
        <mongodb-spark.version>2.0.0</mongodb-spark.version>
        <casbah.version>3.1.1</casbah.version>
        <elasticsearch-spark.version>5.6.2</elasticsearch-spark.version>
        <elasticsearch.version>5.6.2</elasticsearch.version>
        <redis.version>2.9.0</redis.version>
        <kafka.version>0.10.2.1</kafka.version>
        <spark.version>2.1.1</spark.version>
        <scala.version>2.11.8</scala.version>
        <jblas.version>1.2.1</jblas.version>
    </properties>

    <dependencies>
        <!--引入共同的日誌管理工具-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

    <build>
        <!--宣告並引入子專案共有的外掛-->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <!--所有的編譯用 JDK1.8-->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <!--maven 的打包外掛-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!--該外掛用於將 scala 程式碼編譯成 class 檔案-->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <!--繫結到 maven 的編譯階段-->
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>
  • recommender pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>MovieRecommendSystem</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>recommender</artifactId>
    <packaging>pom</packaging>
    <modules>
        <module>DataLoader</module>
        <module>StatisticsRecommender</module>
        <module>OfflineRecommender</module>
        <module>StreamingRecommender</module>
        <module>ContentRecommender</module>
        <module>KafkaStream</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- 引入 Spark 相關的 Jar 包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency> <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <!-- 父專案已宣告該 plugin,子專案在引入的時候,不用宣告版本和已經宣告的配置 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
  • DataLoader pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>recommender</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>DataLoader</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark 的依賴引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入 MongoDB 的驅動 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
        <!-- 加入 ElasticSearch 的驅動 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>${elasticsearch-spark.version}</version>
            <!-- 將不需要依賴的包從依賴路徑中除去 -->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hive</groupId>
                    <artifactId>hive-service</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
</project>
  • StatisticsRecommender pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>recommender</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>StatisticsRecommender</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark 的依賴引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入 MongoDB 的驅動 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
</project>
  • OfflineRecommender pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>recommender</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>OfflineRecommender</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>jblas</artifactId>
            <version>${jblas.version}</version>
        </dependency>
        <!-- 引入 Spark 相關的 Jar 包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 加入 MongoDB 的驅動 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
</project>
  • StreamingRecommender pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>recommender</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>StreamingRecommender</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark 的依賴引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
        </dependency>
        <!-- 引入 Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入 MongoDB 的驅動 -->
        <!-- 用於程式碼方式連線 MongoDB -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <!-- 用於 Spark 和 MongoDB 的對接 -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
        <!-- redis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
</project>
  • ContentRecommender pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>recommender</artifactId>
        <groupId>com.lotuslaw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>ContentRecommender</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>jblas</artifactId>
            <version>${jblas.version}</version>
        </dependency>
        <!-- 引入 Spark 相關的 Jar 包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 加入 MongoDB 的驅動 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
</project>

各模組程式碼

  • DataLoader
package com.lotuslaw.recommender

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient

import java.net.InetAddress

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.recommender
 * @create: 2021-08-23 22:49
 * @description:
 */

/**
 * Movie 資料集
 * 260                                                               電影ID:mid
 * Star Wars: Episode IV - A New Hope (1977)                         電影名稱:name
 * Princess Leia is captured and held hostage by the evil            詳情描述:descri
 * 121 minutes                                                       時長:timelong
 * September 21, 2004                                                發行時間:issue
 * 1977                                                              拍攝時間:shoot
 * English                                                           語言:language
 * Action|Adventure|Sci-Fi                                           型別:genres
 * Mark Hamill|Harrison Ford|Carrie Fisher|Peter Cushing|Alec        演員表:actors
 * George Lucas                                                      導演:directors
 */
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String,
                 genres: String, actors: String, directors: String)

/**
 * Ratings 資料集
 * 1,31,2.5,1260759144
 */
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)

/**
 * Tags 資料集
 * 15,1955,dentist,1193435061
 */
case class Tag(uid: Int, mid: Int, tag: String, timestamp: Int)

// 把mongo和ES的配置封裝成樣例類
/**
 *
 * @param uri MongoDB連線
 * @param db MongoDB資料庫
 */
case class MongoConfig(uri: String, db: String)

/**
 *
 * @param httpHosts         http主機列表
 * @param transportHosts    transport主機列表
 * @param index             需要操作的索引
 * @param clustername       叢集名稱,es-cluster
 */
case class ESConfig(httpHosts: String, transportHosts: String, index: String, clustername: String)



object DataLoader {

  // 定義常量
  val MOVIE_DATA_PATH = "C:\\Users\\86188\\Desktop\\推薦系統課程\\5.推薦工程\\recommender\\DataLoader\\src\\main\\resources\\movies.csv"
  val RATING_DATA_PATH = "C:\\Users\\86188\\Desktop\\推薦系統課程\\5.推薦工程\\recommender\\DataLoader\\src\\main\\resources\\ratings.csv"
  val TAG_DATA_PATH = "C:\\Users\\86188\\Desktop\\推薦系統課程\\5.推薦工程\\recommender\\DataLoader\\src\\main\\resources\\tags.csv"

  val MONGODB_MOVIE_COLLECTION = "Movie"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_TAG_COLLECTION = "Tag"
  val ES_MOVIE_INDEX = "Movie"

  def main(args: Array[String]): Unit = {

    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://192.168.88.132:27017/recommender",
      "mongo.db" -> "recommender",
      "es.httpHosts" -> "linux:9200",
      "es.transportHosts" -> "linux:9300",
      "es.index" -> "recommender",
      "es.cluster.name" -> "es-cluster"
    )

    // 建立一個sparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")

    // 建立一個sparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    // 載入資料
    val movieRDD = spark.sparkContext.textFile(MOVIE_DATA_PATH)
    val movieDF = movieRDD.map{
      item => {
        val attr = item.split("\\^")
        Movie(attr(0).toInt, attr(1).trim, attr(2).trim, attr(3).trim, attr(4).trim, attr(5).trim, attr(6).trim, attr(7).trim, attr(8).trim, attr(9).trim)
      }
    }.toDF()

    val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
    val ratingDF = ratingRDD.map{
      item => {
        val attr = item.split(",")
        Rating(attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
      }
    }.toDF()

    val tagRDD = spark.sparkContext.textFile(TAG_DATA_PATH)
    val tagDF = tagRDD.map{
      item => {
        val attr = item.split(",")
        Tag(attr(0).toInt, attr(1).toInt, attr(2).trim, attr(3).toInt)
      }
    }.toDF()

    implicit val mongoConfig: MongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    // 將資料儲存到MongoDB
    storeDataInMongoDB(movieDF, ratingDF, tagDF)

    // 資料預處理,把movie對應的tag資訊新增進去,加一列 tag1|tag2|tag3...
    import org.apache.spark.sql.functions._

    /**
     * mid, tags
     * tags: tag1|tag2|tag3...
     */
    val newTag = tagDF.groupBy($"mid")
      .agg(concat_ws("|", collect_set($"tag")).as("tags"))
      .select("mid", "tags")

    // 對newTag和movie做join,資料合併在一起,左外連線
    val movieWithTagsDF = movieDF.join(newTag, Seq("mid"), "left")

    implicit val esConfig: ESConfig = ESConfig(config("es.httpHosts"), config("es.transportHosts"), config("es.index"), config("es.cluster.name"))

    // 儲存資料到ES
    storeDataInES(movieWithTagsDF)

//    spark.stop()
  }

  def storeDataInMongoDB(movieDF: DataFrame, ratingDF: DataFrame, tagDF: DataFrame)(implicit mongoConfig: MongoConfig): Unit = {
    // 新建一個mongodb的連線
    val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))

    // 如果mongodb中已經有相應的資料庫,先刪除
    mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).dropCollection()
    mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).dropCollection()
    mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).dropCollection()

    // 將資料寫入mongodb表中
    movieDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    ratingDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    tagDF.write
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_TAG_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 對資料表建索引
    // 1 為指定按升序建立索引
    mongoClient(mongoConfig.db)(MONGODB_MOVIE_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
    mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
    mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
    mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
    mongoClient(mongoConfig.db)(MONGODB_TAG_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
    mongoClient.close()
  }

  def storeDataInES(movieDF: DataFrame)(implicit eSConfig: ESConfig): Unit = {
    // 新建es配置
    val settings: Settings = Settings.builder().put("cluster.name", eSConfig.clustername).build()

    // 新建一個es客戶端
    val esClient = new PreBuiltTransportClient(settings)
    val REGEX_HOST_PORT = "(.+):(\\d+)".r
    eSConfig.transportHosts.split(",").foreach{
      case REGEX_HOST_PORT(host: String, port: String) =>
        esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt))
    }

    // 先清理遺留的資料
    if(esClient.admin().indices().exists(new IndicesExistsRequest(eSConfig.index))
      .actionGet()
      .isExists
    ) {
      esClient.admin().indices().delete(new DeleteIndexRequest(eSConfig.index))
    }
    esClient.admin().indices().create(new CreateIndexRequest(eSConfig.index))
    movieDF.write
      .option("es.nodes", eSConfig.httpHosts)
      .option("es.http.timeout", "100m")
      .option("es.mapping.id", "mid")
      .option("es.nodes.wan.only","true")
      .mode("overwrite")
      .format("org.elasticsearch.spark.sql")
      .save(eSConfig.index + "/" + ES_MOVIE_INDEX)
  }
}
  • StatisticsRecommender
package com.lotuslaw.statistics

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.text.SimpleDateFormat
import java.util.Date

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.statistics
 * @create: 2021-08-24 11:24
 * @description:
 */

case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String,
                 genres: String, actors: String, directors: String)

case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

// 定義一個基礎推薦物件
case class Recommendation(mid: Int, score: Double)

// 定義電影類別top10推薦物件
case class GenresRecommendation(genres: String, recs: Seq[Recommendation])

object StatisticsRecommender {

  // 定義表名
  val MONGODB_MOVIE_COLLECTION = "Movie"
  val MONGODB_RATING_COLLECTION = "Rating"

  // 統計表的名稱
  val RATE_MORE_MOVIES = "RateMoreMovies"
  val RATE_MORE_RECENTLY_MOVIES = "RateMoreRecentlyMovies"
  val AVERAGE_TOP_MOVIES = "AverageMovies"
  val GENRES_TOP_MOVIES = "GenresTopMovies"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://linux:27017/recommender",
      "mongo.db" -> "recommender"
    )

    // 建立一個sparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")

    // 建立一個sparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig: MongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 從mongodb載入資料
    val ratingDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Rating]
      .toDF()

    val movieDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .toDF()

    // 建立名為ratings的臨時表
    ratingDF.createOrReplaceTempView("ratings")

    // TODO: 不同的統計推薦結果
    // 1.歷史熱門統計:歷史評分資料最多,mid,count
    val rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings group by mid")
    // 把結果寫入對應的mongodb表中
    storeDFInMongoDB(rateMoreMoviesDF, RATE_MORE_MOVIES)
    // 2.近期熱門統計:按照"yyyyMM"格式選取最近的評分資料,統計評分個數
    // 建立一個日期格式化工具
    val simpleDateFormat = new SimpleDateFormat("yyyyMM")
    // 註冊udf,把時間戳轉換成年月格式
    spark.udf.register("changDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
    // 對原始資料做處理,去掉uid
    val ratingOfYearMonth = spark.sql("select mid, score, changDate(timestamp) as yearmonth from ratings")
    ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")

    // 從ratingOfMonth中查詢電影在各個月份的評分,mid,count,yearmonth
    val rateMoreRecentlyMoviesDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")
    // 存入mongodb
    storeDFInMongoDB(rateMoreRecentlyMoviesDF, RATE_MORE_RECENTLY_MOVIES)
    // 3.優質電影統計,統計電影的平均評分
    val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")
    storeDFInMongoDB(averageMoviesDF, AVERAGE_TOP_MOVIES)
    // 4.各類別電影Top統計
    // 定義所有類別
    val genres = List("Action","Adventure","Animation","Comedy","Crime","Documentary","Drama","Famiy","Fantasy","Foreign","History","Horror","Music","Mystery","Romance","Science","Tv","Thriller","War","Western")
    // 把平均評分加入movie表裡,加一列,inner join
    val movieWithScore = movieDF.join(averageMoviesDF, "mid")
    // 為做笛卡爾積,把genres轉成rdd
    val genresRDD = spark.sparkContext.makeRDD(genres)
    // 計算類別top10, 首先對類別和電影做笛卡爾積
    val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd)
      .filter{
        // 條件過濾找出movie的欄位genres值包含當前類別genre的那些
        case (genre, movieRow) => movieRow.getAs[String]("genres").toLowerCase.contains(genre.toLowerCase())
      }
      .map{
        case (genre, movieRow) => (genre, (movieRow.getAs[Int]("mid"), movieRow.getAs[Double]("avg")))
      }
      .groupByKey()
      .map{
        case (genre, items) => GenresRecommendation(genre, items.toList.sortWith(_._2>_._2).take(10).map{
          items => Recommendation(items._1, items._2)
        })
      }
      .toDF()
    storeDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)
    spark.stop()
  }

  def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit = {
    df.write
      .option("uri", mongoConfig.uri)
      .option("collection", collection_name)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
  }
}
  • OfflineRecommender
package com.lotuslaw.offline

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.offline
 * @create: 2021-08-24 14:49
 * @description:
 */

// 基於評分資料的隱語義模型只需要rating資料
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

case class Recommendation(mid: Int, score: Double)

// 定義基於預測評分的使用者推薦列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])

// 定義基於LFM電影特徵向量的電影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])

object OfflineRecommender {

  // 定義表名和常量
  val MONGODB_RATING_COLLECTION = "Rating"

  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"

  val USER_MAX_RECOMMENTDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://linux:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf: SparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig: MongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 載入資料
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map{rating => (rating.uid, rating.mid, rating.score)}  // 轉化成rdd並去掉時間戳
      .cache()

    // 從rating資料中提取所有的uid,mid,並去重
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    // 訓練隱語義模型
    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
    val (rank, iterations, lambda) = (100, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    // 基於使用者和電影的隱特徵,計算預測評分,得到使用者的推薦列表
    // 計算user和movie的笛卡爾積,得到一個評分矩陣
    val userMovies = userRDD.cartesian(movieRDD)

    // 呼叫model的predict方法預測評分
    val preRatings = model.predict(userMovies)

    val userRecs = preRatings
      .filter(_.rating > 0)  // 過濾出評分大於0的項
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map{
        case (uid, recs) => UserRecs(uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENTDATION).map(x=>Recommendation(x._1, x._2)))
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .format("com.mongodb.spark.sql")
      .save()

    // 基於電影隱特徵計算相似度矩陣,得到電影的相似度列表
    val movieFeatures = model.productFeatures.map{
      case (mid, features) => (mid, new DoubleMatrix(features))
    }

    // 對所有電影兩兩計算他們的相似度,先做笛卡爾積
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配對過濾掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) =>
          val simScore = this.consinSim(a._2, b._2)
          (a._1, (b._1, simScore))
      }
      .filter(_._2._2>0.6)  // 過濾出相似度大於0.6的
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs(mid, items.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1, x._2)))
      }
      .toDF()

    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  // 求向量餘弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
    movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())
  }
}
  • StreamingRecommender
package com.lotuslaw.streaming

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.streaming
 * @create: 2021-08-24 16:29
 * @description:
 */

// 定義連線助手物件,序列化
object ConnHelper extends Serializable {
  lazy val jedis = new Jedis("linux")
  lazy val mongoClient: MongoClient = MongoClient(MongoClientURI("mongodb://linux:27017/recommender"))
}

case class MongoConfig(uri: String, db: String)
// 標準推薦
case class Recommendation(mid: Int, score: Double)
// 使用者的推薦
case class UserRecs(uid: Int, recs: Seq[Recommendation])
//電影的相似度
case class MovieRecs(mid: Int, recs: Seq[Recommendation])

object StreamingRecommender {

  // 定義表名與常量
  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://linux:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf: SparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2))  // batch duration

    import spark.implicits._

    implicit val mongoConfig: MongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 載入電影相似度矩陣資料,把它廣播出去
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd
      .map { movieRecs => // 為了查詢相似度方便,轉換成map
        (movieRecs.mid, movieRecs.recs.map(x=>(x.mid, x.score)).toMap)
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 定義kafka連線引數
    val kafkaParam = Map(
      "bootstrap.servers" -> "linux:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"
    )
    // 通過kafka建立一個DSteam
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam)
    )

    // 把原始資料UID|MID|SCORE|TIMESTAMP轉換成評分流
    val ratingStream = kafkaStream.map {
      msg =>
        val attr = msg.value().split("\\|")
        (attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
    }

    // 繼續做流式處理,核心演算法部分
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid, mid, score, timestamp) =>
          println("rating data coming! >>>>>>>>>>>>>>>>>>")
        // 從redis裡獲取當前使用者最近的k次評分,儲存成Array[(mid, score)]
          val userRecntlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis)

        // 從相似度矩陣中取出當前電影最相似的N個電影,作為備選列表,Array[mid]
          val candidateMovies = getTopSimMovies(MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value)

        // 對每個備選電影,計算推薦優先順序,得到當前使用者的實時推薦列表,Array[(mid, score)]
          val streamRecs = computeMovieScore(candidateMovies, userRecntlyRatings, simMovieMatrixBroadCast.value)

        // 把推薦資料儲存到mongodb
          saveDataToMongoDB(uid, streamRecs)
      }
    }

    // 開始接收和處理資料
    ssc.start()
    println(">>>>>>>>>>>>>>> straming started")
    ssc.awaitTermination()
  }

  // Redis操作返回的是java類,為了用map操作需要引入轉換類
  import scala.collection.JavaConversions._
  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    // 從Redis讀取資料,使用者評分資料儲存在uid:UID 為key的佇列裡,value是MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item =>
          val attr = item.split("\\:")
          (attr(0).trim.toInt, attr(1).trim.toDouble)
      }
      .toArray
  }

  /**
   * 獲取當前電影最相似的num個電影,作為備選電影
   * @param num 相似電影數量
   * @param mid 當前電影ID
   * @param uid 當前評分使用者ID
   * @param simMovies 相似度矩陣
   * @return 過濾之後的備選電影列表
   */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])(implicit mongoConfig: MongoConfig): Array[Int] = {
    // 從相似度矩陣中拿到所有相似的電影
    val allSimMovies = simMovies(mid).toArray

    // 從mongodb中查詢使用者已看過的電影
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find(MongoDBObject("uid" -> uid))
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }

    // 把看過的過濾,得到輸出列表
    allSimMovies.filter(x => ! ratingExist.contains(x._1))
      .sortWith(_._2>_._2)
      .take(num)
      .map(x => x._1)
  }

  def computeMovieScore(candidateMovies: Array[Int], userRecentlyRatings: Array[(Int, Double)], simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {
    // 定義一個ArrayBuffer,用於儲存每一個備選電影的基礎得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定義一個HashMap,儲存每一個備選定影的增強減弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()
    for (candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings) {
      // 拿到備選電影和最近評分電影的相似度
      val simScore = getMoviesSimScore(candidateMovie, userRecentlyRating._1, simMovies)
      if (simScore > 0.7) {
        // 計算備選電影的基礎推薦得分
        scores += ((candidateMovie, simScore * userRecentlyRating._2))
        if (userRecentlyRating._2 > 3) {
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else {
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    // 根據備選電影的mid做groupby,根據公式去求最後的推薦評分
    scores.groupBy(_._1).map{
      // groupBy之後得到的資料 Map(mid -> ArrayBuffer[(mid, score)])
      case (mid, scoreList) =>
        (mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)))
    }.toArray
  }

  // 獲取兩個電影之間的相似度
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
    simMovies.get(mid1) match {
      case Some(sims) => sims.get(mid2) match {
        case Some(score) => score
        case None => 0.0
      }
      case None => 0.0
    }
  }

  // 求一個數的對數
  def log(m: Int): Double = {
    val N = 10
    math.log(m) / math.log(N)
  }

  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
    // 定義到StreamRecs表的連線
    val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

    // 如果表中已有uid對應的資料,先刪除
    streamRecsCollection.findAndRemove(MongoDBObject("uid" -> uid))
    // 將streamRecs存入表中
    streamRecsCollection.insert(MongoDBObject("uid" -> uid, "recs" -> streamRecs.map(x=>MongoDBObject("mid"->x._1, "score"->x._2))))
  }
}
  • ContentRecommender
package com.lotuslaw.content

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
 * @author: lotuslaw
 * @version: V1.0
 * @package: com.lotuslaw.content
 * @create: 2021-08-24 19:37
 * @description:
 */

// 需要的資料來源是電影內容資訊
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String,
                 genres: String, actors: String, directors: String)

case class MongoConfig(uri: String, db: String)

case class Recommendation(mid: Int, score: Double)

// 定義基於電影內容資訊提取出的特徵向量的電影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])

object ContentRecommender {

  // 定義常量及表名
  val MONGODB_MOVIE_COLLECTION = "Movie"

  val CONTENT_MOVIE_RECS = "ContentMovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://linux:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf: SparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig: MongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 載入資料並做預處理
    val movieTagsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .map(
        x => (x.mid, x.name, x.genres.map(c=>if(c=='|') ' ' else c))
      )
      .toDF("mid", "name", "genres")
      .cache()

    // TODO: 從內容資訊中提取電影特徵向量
    // 核心部分,用TF-IDF從內容資訊中提取電影特徵向量

    // 建立一個分詞器,預設按照空格分詞
    val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")

    // 用分詞器對原始資料做轉換,生成新的一列words
    val wordsData = tokenizer.transform(movieTagsDF)

    // 引入HashingTF工具,可以把一個詞語序列轉化成對應的詞頻
    val hasingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
    val featurizeData = hasingTF.transform(wordsData)

    // 引入IDF工具,可以得到idf模型
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 訓練idf模型,得到每個詞的逆文件頻率
    val idfModel = idf.fit(featurizeData)

    // 用模型對元資料進行處理,得到文件中每個詞的tf-idf,作為新的特徵向量
    val rescaledData = idfModel.transform(featurizeData)

    val movieFeatures = rescaledData.map(
      row => (row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray)
    )
      .rdd
      .map(
        x => (x._1, new DoubleMatrix(x._2))
      )

    // 對所有電影兩兩計算他們的相似度,先做笛卡爾積
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配對過濾掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) =>
          val simScore = this.consinSim(a._2, b._2)
          (a._1, (b._1, simScore))
      }
      .filter(_._2._2>0.6)  // 過濾出相似度大於0.6的
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs(mid, items.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1, x._2)))
      }
      .toDF()

    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", CONTENT_MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  // 求向量餘弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix): Double = {
    movie1.dot(movie2) / (movie1.norm2() * movie2.norm2())
  }
}