1. 程式人生 > >使用Mahout實現協同過濾 spark

使用Mahout實現協同過濾 spark

Mahout使用了Taste來提高協同過濾演算法的實現,它是一個基於Java實現的可擴充套件的,高效的推薦引擎。Taste既實現了最基本的基 於使用者的和基於內容的推薦演算法,同時也提供了擴充套件介面,使使用者可以方便的定義和實現自己的推薦演算法。同時,Taste不僅僅只適用於Java應用程式,它 可以作為內部伺服器的一個元件以HTTP和Web Service的形式向外界提供推薦的邏輯。Taste的設計使它能滿足企業對推薦引擎在效能、靈活性和可擴充套件性等方面的要求。

介面相關介紹

Taste主要包括以下幾個介面:

  • DataModel 是使用者喜好資訊的抽象介面,它的具體實現支援從任意型別的資料來源抽取使用者喜好資訊。Taste 預設提供 JDBCDataModel 和 FileDataModel,分別支援從資料庫和檔案中讀取使用者的喜好資訊。
  • UserSimilarityItemSimilarity 。UserSimilarity 用於定義兩個使用者間的相似度,它是基於協同過濾的推薦引擎的核心部分,可以用來計算使用者的“鄰居”,這裡我們將與當前使用者口味相似的使用者稱為他的鄰居。ItemSimilarity 類似的,計算內容之間的相似度。
  • UserNeighborhood 用於基於使用者相似度的推薦方法中,推薦的內容是基於找到與當前使用者喜好相似的鄰居使用者的方式產生的。UserNeighborhood 定義了確定鄰居使用者的方法,具體實現一般是基於 UserSimilarity 計算得到的。
  • Recommender 是推薦引擎的抽象介面,Taste 中的核心元件。程式中,為它提供一個 DataModel,它可以計算出對不同使用者的推薦內容。實際應用中,主要使用它的實現類 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分別實現基於使用者相似度的推薦引擎或者基於內容的推薦引擎。
  • RecommenderEvaluator :評分器。
  • RecommenderIRStatsEvaluator :蒐集推薦效能相關的指標,包括準確率、召回率等等。

目前,Mahout為DataModel提供了以下幾種實現:

  • org.apache.mahout.cf.taste.impl.model.GenericDataModel
  • org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel
  • org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel
  • org.apache.mahout.cf.taste.impl.model.file.FileDataModel
  • org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
  • org.apache.mahout.cf.taste.impl.model.cassandra.CassandraDataModel
  • org.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92JDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.GenericJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92BooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLBooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreBooleanPrefSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.ReloadFromJDBCDataModel

從類名上就可以大概猜出來每個DataModel的用途,奇怪的是竟然沒有HDFS的DataModel,有人實現了一個,請參考 MAHOUT-1579

UserSimilarityItemSimilarity 相似度實現有以下幾種:

  • CityBlockSimilarity :基於Manhattan距離相似度
  • EuclideanDistanceSimilarity :基於歐幾里德距離計算相似度
  • LogLikelihoodSimilarity :基於對數似然比的相似度
  • PearsonCorrelationSimilarity :基於皮爾遜相關係數計算相似度
  • SpearmanCorrelationSimilarity :基於皮爾斯曼相關係數相似度
  • TanimotoCoefficientSimilarity :基於谷本系數計算相似度
  • UncenteredCosineSimilarity :計算 Cosine 相似度

以上相似度的說明,請參考Mahout推薦引擎介紹。

UserNeighborhood 主要實現有兩種:

  • NearestNUserNeighborhood:對每個使用者取固定數量N個最近鄰居
  • ThresholdUserNeighborhood:對每個使用者基於一定的限制,取落在相似度限制以內的所有使用者為鄰居

Recommender分為以下幾種實現:

  • GenericUserBasedRecommender:基於使用者的推薦引擎
  • GenericBooleanPrefUserBasedRecommender:基於使用者的無偏好值推薦引擎
  • GenericItemBasedRecommender:基於物品的推薦引擎
  • GenericBooleanPrefItemBasedRecommender:基於物品的無偏好值推薦引擎

RecommenderEvaluator有以下幾種實現:

  • AverageAbsoluteDifferenceRecommenderEvaluator :計算平均差值
  • RMSRecommenderEvaluator :計算均方根差

RecommenderIRStatsEvaluator的實現類是GenericRecommenderIRStatsEvaluator。

單機執行

首先,需要在maven中加入對mahout的依賴:

<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-integration</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-examples</artifactId>
<version>0.9</version>
</dependency>

基於使用者的推薦,以FileDataModel為例:

File modelFile modelFile = new File("intro.csv");

DataModel model = new FileDataModel(modelFile);

//使用者相似度,使用基於皮爾遜相關係數計算相似度
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);

//選擇鄰居使用者,使用NearestNUserNeighborhood實現UserNeighborhood介面,選擇鄰近的4個使用者
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);

Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);

//給使用者1推薦4個物品
List<RecommendedItem> recommendations = recommender.recommend(1, 4);

for (RecommendedItem recommendation : recommendations) {
    System.out.println(recommendation);
}

注意:

FileDataModel要求輸入檔案中的欄位分隔符為逗號或者製表符,如果你想使用其他分隔符,你可以擴充套件一個FileDataModel的實現,例如,mahout中已經提供了一個解析MoiveLens的資料集(分隔符為 :: )的實現GroupLensDataModel。

對相同使用者重複獲得推薦結果,我們可以改用CachingRecommender來包裝GenericUserBasedRecommender物件,將推薦結果快取起來:

Recommender cachingRecommender = new CachingRecommender(recommender);

上面程式碼可以在main方法中直接執行,然後,我們可以獲取推薦模型的評分:

//使用平均絕對差值獲得評分
RecommenderEvaluator evaluator = new AverageAbsoluteDifferenceRecommenderEvaluator();
// 用RecommenderBuilder構建推薦引擎
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
@Override
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
}
};
// Use 70% of the data to train; test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);

接下來,可以獲取推薦結果的查準率和召回率:

RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
@Override
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
}
};
// 計算推薦4個結果時的查準率和召回率
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());

如果是基於物品的推薦,程式碼大體相似,只是沒有了UserNeighborhood,然後將上面程式碼中的User換成Item即可,完整程式碼如下:

File modelFile modelFile = new File("intro.csv");
DataModel model = new FileDataModel(new File(file));
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
    @Override
    public Recommender buildRecommender(DataModel model) throws TasteException {
ItemSimilarity similarity = new PearsonCorrelationSimilarity(model);
return new GenericItemBasedRecommender(model, similarity);
    }
};
//獲取推薦結果
List<RecommendedItem> recommendations = recommenderBuilder.buildRecommender(model).recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
    System.out.println(recommendation);
}
//計算評分
RecommenderEvaluator evaluator =
new AverageAbsoluteDifferenceRecommenderEvaluator();
// Use 70% of the data to train; test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
//計算查全率和查準率
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Evaluate precision and recall "at 2":
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,
null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());

在Spark中執行

在Spark中執行,需要將Mahout相關的jar新增到Spark的classpath中,修改/etc/spark/conf/spark-env.sh,新增下面兩行程式碼:

SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/lib/*"
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/*"

然後,以本地模式在spark-shell中執行下面程式碼互動測試:

//注意:這裡是本地目錄
val model = new FileDataModel(new File("intro.csv"))

val evaluator = new RMSRecommenderEvaluator()
val recommenderBuilder = new RecommenderBuilder {
  override def buildRecommender(dataModel: DataModel): Recommender = {
    val similarity = new LogLikelihoodSimilarity(dataModel)
    new GenericItemBasedRecommender(dataModel, similarity)
  }
}

val score = evaluator.evaluate(recommenderBuilder, null, model, 0.95, 0.05)
println(s"Score=$score")

val recommender=recommenderBuilder.buildRecommender(model)
val users=trainingRatings.map(_.user).distinct().take(20)

import scala.collection.JavaConversions._

val result=users.par.map{user=>
  user+","+recommender.recommend(user,40).map(_.getItemID).mkString(",")
}

分散式執行

Mahout提供了 org.apache.mahout.cf.taste.hadoop.item.RecommenderJob 類以MapReduce的方式來實現基於物品的協同過濾,檢視該類的使用說明:

$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob
15/06/10 16:19:34 ERROR common.AbstractJob: Missing required option --similarityClassname
Missing required option --similarityClassname
Usage:
 [--input <input> --output <output> --numRecommendations <numRecommendations>
--usersFile <usersFile> --itemsFile <itemsFile> --filterFile <filterFile>
--booleanData <booleanData> --maxPrefsPerUser <maxPrefsPerUser>
--minPrefsPerUser <minPrefsPerUser> --maxSimilaritiesPerItem
<maxSimilaritiesPerItem> --maxPrefsInItemSimilarity <maxPrefsInItemSimilarity>
--similarityClassname <similarityClassname> --threshold <threshold>
--outputPathForSimilarityMatrix <outputPathForSimilarityMatrix> --randomSeed
<randomSeed> --sequencefileOutput --help --tempDir <tempDir> --startPhase
<startPhase> --endPhase <endPhase>]
--similarityClassname (-s) similarityClassname    Name of distributed
similarity measures class to
instantiate, alternatively
use one of the predefined
similarities
([SIMILARITY_COOCCURRENCE,
SIMILARITY_LOGLIKELIHOOD,
SIMILARITY_TANIMOTO_COEFFICIEN
T, SIMILARITY_CITY_BLOCK,
SIMILARITY_COSINE,
SIMILARITY_PEARSON_CORRELATION
,
SIMILARITY_EUCLIDEAN_DISTANCE]
)

可見,該類可以接收的命令列引數如下:

  • --input(path) : 儲存使用者偏好資料的目錄,該目錄下可以包含一個或多個儲存使用者偏好資料的文字檔案;
  • --output(path) : 結算結果的輸出目錄
  • --numRecommendations (integer) : 為每個使用者推薦的item數量,預設為10
  • --usersFile (path) : 指定一個包含了一個或多個儲存userID的檔案路徑,僅為該路徑下所有檔案包含的userID做推薦計算 (該選項可選)
  • --itemsFile (path) : 指定一個包含了一個或多個儲存itemID的檔案路徑,僅為該路徑下所有檔案包含的itemID做推薦計算 (該選項可選)
  • --filterFile (path) : 指定一個路徑,該路徑下的檔案包含了 [userID,itemID] 值對,userID和itemID用逗號分隔。計算結果將不會為user推薦 [userID,itemID] 值對中包含的item (該選項可選)
  • --booleanData (boolean) : 如果輸入資料不包含偏好數值,則將該引數設定為true,預設為false
  • --maxPrefsPerUser (integer) : 在最後計算推薦結果的階段,針對每一個user使用的偏好資料的最大數量,預設為10
  • --minPrefsPerUser (integer) : 在相似度計算中,忽略所有偏好資料量少於該值的使用者,預設為1
  • --maxSimilaritiesPerItem (integer) : 針對每個item的相似度最大值,預設為100
  • --maxPrefsPerUserInItemSimilarity (integer) : 在item相似度計算階段,針對每個使用者考慮的偏好資料最大數量,預設為1000
  • --similarityClassname (classname) : 向量相似度計算類
  • outputPathForSimilarityMatrix :SimilarityMatrix輸出目錄
  • --randomSeed :隨機種子 -- sequencefileOutput :序列檔案輸出路徑
  • --tempDir (path) : 儲存臨時檔案的目錄,預設為當前使用者的home目錄下的temp目錄
  • --startPhase
  • --endPhase
  • --threshold (double) : 忽略相似度低於該閥值的item對

一個例子如下,使用SIMILARITY_LOGLIKELIHOOD相似度推薦物品:

$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /tmp/mahout/part-00000 --output /tmp/mahout-out  -s SIMILARITY_LOGLIKELIHOOD

上面命令執行完成之後,會在當前使用者的hdfs主目錄生成temp目錄,該目錄可由 --tempDir (path) 引數設定:

$ hadoop fs -ls temp
Found 10 items
-rw-r--r--   3 root hadoop          7 2015-06-10 14:42 temp/maxValues.bin
-rw-r--r--   3 root hadoop    5522717 2015-06-10 14:42 temp/norms.bin
drwxr-xr-x   - root hadoop          0 2015-06-10 14:41 temp/notUsed
-rw-r--r--   3 root hadoop          7 2015-06-10 14:42 temp/numNonZeroEntries.bin
-rw-r--r--   3 root hadoop    3452222 2015-06-10 14:41 temp/observationsPerColumn.bin
drwxr-xr-x   - root hadoop          0 2015-06-10 14:47 temp/pairwiseSimilarity
drwxr-xr-x   - root hadoop          0 2015-06-10 14:52 temp/partialMultiply
drwxr-xr-x   - root hadoop          0 2015-06-10 14:39 temp/preparePreferenceMatrix
drwxr-xr-x   - root hadoop          0 2015-06-10 14:50 temp/similarityMatrix
drwxr-xr-x   - root hadoop          0 2015-06-10 14:42 temp/weights

觀察yarn的管理介面,該命令會生成9個任務,任務名稱依次是:

  • PreparePreferenceMatrixJob-ItemIDIndexMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemPrefsMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemVectorsMapper-Reducer
  • RowSimilarityJob-CountObservationsMapper-Reducer
  • RowSimilarityJob-VectorNormMapper-Reducer
  • RowSimilarityJob-CooccurrencesMapper-Reducer
  • RowSimilarityJob-UnsymmetrifyMapper-Reducer
  • partialMultiply
  • RecommenderJob-PartialMultiplyMapper-Reducer

從任務名稱,大概可以知道每個任務在做什麼,如果你的輸入引數不一樣,生成的任務數可能不一樣,這個需要測試一下才能確認。

在hdfs上檢視輸出的結果:

843 [10709679:4.8334665,8389878:4.833426,9133835:4.7503786,10366169:4.7503185,9007487:4.750272,8149253:4.7501993,10366165:4.750115,9780049:4.750108,8581254:4.750071,10456307:4.7500467]
6253    [10117445:3.0375953,10340299:3.0340924,8321090:3.0340924,10086615:3.032164,10436801:3.0187714,9668385:3.0141575,8502110:3.013954,10476325:3.0074399,10318667:3.0004222,8320987:3.0003839]

使用Java API方式執行:

StringBuilder sb = new StringBuilder();
sb.append("--input ").append(inPath);
sb.append(" --output ").append(outPath);
sb.append(" --tempDir ").append(tmpPath);
sb.append(" --booleanData true");
sb.append(" --similarityClassname 
org.apache.mahout.math.hadoop.similarity.
cooccurrence.measures.EuclideanDistanceSimilarity");
args = sb.toString().split(" ");

JobConf jobConf = new JobConf(conf);
jobConf.setJobName("MahoutTest");

RecommenderJob job = new RecommenderJob();
job.setConf(conf);
job.run(args);

在Scala或者Spark中,可以以Java API或者命令方式執行,最後還可以通過Spark來處理推薦的結果,例如:過濾、去重、補足資料,這部分內容不做介紹。

http://www.tuicool.com/articles/FzmQziz