1. 程式人生 > >深入理解mahout基於hadoop的協同過濾流程

深入理解mahout基於hadoop的協同過濾流程

mahout版本為mahout-distribution-0.9

mahout基於hadoop協同過濾(itembased)觸發類為org.apache.mahout.cf.taste.hadoop.item.RecommenderJob。

執行RecommenderJob時大概需要以下引數:

--input <input> 
--output <output> 
--numRecommendations   Number of recommendations per user
--usersFile   File of users to recommend for
--itemsFileFile of items to recommend for
 --filterFileFile containing comma-separated userID,itemID pairs. Used to exclude the item from the recommendations for that user (optional)  
--booleanData Treat input as without pref values 
--similarityClassname (-s) similarityClassname     [SIMILARITY_COOCCURRENCE,SIMILARITY_LOGLIKELIHOOD,
     SIMILARITY_TANIMOTO_COEFFICIENT,SIMILARITY_CITY_BLOCK,
     SIMILARITY_COSINE,SIMILARITY_PEARSON_CORRELATION,
     SIMILARITY_EUCLIDEAN_DISTANCE]
--maxPrefsPerUserMaximum number of preferences considered per user in final recommendation phase           
--minPrefsPerUser ignore users with less preferences than this in the similarity computation
 --maxSimilaritiesPerItem  Maximum number of similarities considered per item 
--maxPrefsInItemSimilaritymax number of preferences to consider per user or item in the item similarity computation phase, users or items withmore preferences will be sampled down
 --threshold <threshold>            discard item pairs with a similarity value below this 
--outputPathForSimilarityMatrix  write the item similarity matrix to this path (optional)
--randomSeed    use this seed for sampling 
--sequencefileOutput  write the output into a SequenceFile instead of a text file
--help 
--tempDir  Intermediate output directory
--startPhase  First phase to run default 0
--endPhaseLast phase to run  

                                       

mahout協同過濾輸入格式為(userid,itemid,preference)。RecommenderJob流程主要包括以下幾部分,我們會依次進行分析。

1、PreparePreferenceMatrixJob作業

所在包為org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob,開啟此類可以看到,此作業包含3個子作業。

1.1 itemIDIndex

此作業會執行ItemIDIndexMapper.class作為map類,執行ItemIDIndexReducer.class作為reduce類。

ItemIDIndexMapper.class作用是將long型的itemid轉為int型內部索引。輸入格式為(userid,itemid,preference),輸出格式為(index,itemid)。

結果會儲存到tempDir/preparePreferenceMatrix/itemIDIndex中。

1.2 toUserVectors

此作業執行ToItemPrefsMapper.class作為map類,ToUserVectorsReducer.class作為reduce類。

toUserVectors作用是將使用者偏好寫成每個使用者的偏好向量。輸入格式為(userid,temid,preference>),輸出格式為 (userId, VectorWritable<itemId, preference

>)

結果會儲存到tempDir/preparePreferenceMatrix/userVectors檔案中。

1.3 toItemVectors

此作業執行ToItemVectorsMapper.class作為map類,ToItemVectorsReducer.class作為reduce類。

toItemVectors作用是建立專案的評分矩陣。輸入為toUserVectors作業的執行結果userVectors格式為(userId, VectorWritable<itemId,preference>),輸出為(itemId,VectorWritable<userId,preference>)。

結果會儲存到tempDir/preparePreferenceMatrix/ratingMatrix檔案中。

2、 RowSimilarityJob作業

所在包為package org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob,開啟此類進行分析。

2.1 countObservations

此作業執行CountObservationsMapper.class作為map類,執行SumObservationsReducer.class作為reduce類。

countObservations作用是計算出每個使用者的評定物品數。輸入為1.3中的結果ratingMatrix格式為(itemId,VectorWritable<userId,preference>),輸出格式為(vector<userid,count>)。

計算結果儲存在tempDir/observationsPerColumn.bin檔案中(此檔案讀取需要使用org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors中的read函式)。

2.2 normsAndTranspose

此作業map函式為VectorNormMapper.class,reduce函式為MergeVectorsReducer.class。

normsAndTranspose作用是將首先將ratingMatrix(itemId,VectorWritable<userId,preference>)資料進行normalize化,之後資料寫成(userId, VectorWritable<itemId,preference>)格式。在此步驟的map階段計算了每個專案preference的norms值,並將數值存到tempDir/norms.bin檔案中,norms計算方式依相似度函式而定,具體實現見相似度函式中的norm函式。此處以EuclideanDistanceSimilarity為例,計算方式如下:

 @Override
  public double norm(Vector vector) {
    double norm = 0;
    for (Vector.Element e : vector.nonZeroes()) {
      double value = e.get();
      norm += value * value;
    }
    return norm;
  }

normalize的計算方式與所選的相似度就算方式有關,此處以SIMILARITY_EUCLIDEAN_DISTANCE為例,計算方式如下:

@Override
  public Vector normalize(Vector vector) {
    return vector;
  }
normalizeSIMILARITY_EUCLIDEAN_DISTANCE為例,計算方式如下:
@Override
  public Vector normalize(Vector vector) {
    if (vector.getNumNondefaultElements() == 0) {
      return vector;
    }

    // center non-zero elements
    double average = vector.norm(1) / vector.getNumNonZeroElements();
    for (Vector.Element e : vector.nonZeroes()) {
      e.set(e.get() - average);
    }
    return super.normalize(vector);
  }

計算結果儲存在tempDir/weights檔案中。

2.3 pairwiseSimilarity

此作業以 CooccurrencesMapper.class作為map類,以SimilarityReducer.class作為reduce類。

pairwiseSimilarity作用為計算item之間的相似度。資料輸入為weights檔案格式為(userId, VectorWritable<itemId,preference>),輸出為(itemM,VectorWritable<itemM+,similarscore>)(每條專案為key的資料中其它專案index一定比當前專案大,比如M對應M+1,M+2,....),由此可以看出此作業輸出為對角陣的上半部分。

此作業計算相似度的方式為:在map階段完成aggregate計算,計算方式依相似度函式而定,此處以SIMILARITY_EUCLIDEAN_DISTANCE為例,計算方式如下,

 @Override
  public double aggregate(double valueA, double nonZeroValueB) {
    return valueA * nonZeroValueB;
  }

再reduce階段通過使用map階段計算出的aggregate值和2.2作業的norms值,得到item之間的相似度,相似度的計算方式依相似度函式而定,此處以EuclideanDistanceSimilarity為例,計算方式如下:

@Override
  public double similarity(double dots, double normA, double normB, int numberOfColumns) {
    // Arg can't be negative in theory, but can in practice due to rounding, so cap it.
    // Also note that normA / normB are actually the squares of the norms.
    double euclideanDistance = Math.sqrt(Math.max(0.0, normA - 2 * dots + normB));
    return 1.0 / (1.0 + euclideanDistance);
  }

此作業的計算結果儲存在tempDir/pairwiseSimilarity中。

2.4 asMatrix

此作業的map類為UnsymmetrifyMapper.class,reduce類為MergeToTopKSimilaritiesReducer.class。

asMatrix的作用是將2.3的矩陣矩陣整理為完整的相似度矩陣。輸入為2.3的pairwiseSimilarity檔案,格式為(itemM,VectorWritable<itemM+,similarscore>),輸出為(itemM,VectorWritable((item1,similarscore),(item2,similarscore),....)。

asMatrix作業的工作方式為:在map階段,計算pairwiseSimilarity中每個item的maxSimilaritiesPerRow(預設為100)個最相似的item,使用TopElementsQueue函式。在reduce階段,首先將斜半矩陣合併成全矩陣,然後使用Vectors.topKElements()函式,取出最相似的maxSimilaritiesPerRow個item。此處可以看出map階段相當於reduce的預處理階段,將對角矩陣進行預處理,減少了reduce的計算量。

asMatrix計算結果儲存在tempDir/similarityMatrix檔案中。

2.5 outputSimilarityMatrix

如果設定了outputPathForSimilarityMatrix引數,此作業將被執行。

outputSimilarityMatrix作用是將2.4產生的結果儲存到使用者通過outputPathForSimilarityMatrix引數設定的目錄裡。

3、partialMultiply作業

此作業也位於org.apache.mahout.cf.taste.hadoop.item.RecommenderJob包中,位於run函式中。分析此作業程式碼:

if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job partialMultiply = new Job(getConf(), "partialMultiply");
      Configuration partialMultiplyConf = partialMultiply.getConfiguration();

      MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class,
                                  SimilarityMatrixRowWrapperMapper.class);
      MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
          SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
      partialMultiply.setJarByClass(ToVectorAndPrefReducer.class);
      partialMultiply.setMapOutputKeyClass(VarIntWritable.class);
      partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class);
      partialMultiply.setReducerClass(ToVectorAndPrefReducer.class);
      partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class);
      partialMultiply.setOutputKeyClass(VarIntWritable.class);
      partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class);
      partialMultiplyConf.setBoolean("mapred.compress.map.output", true);
      partialMultiplyConf.set("mapred.output.dir", partialMultiplyPath.toString());

      if (usersFile != null) {
        partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile);
      }
      partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED, maxPrefsPerUser);

      boolean succeeded = partialMultiply.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

分析上面程式碼可知,此作業通過MultipleInputs.addInputPath()函式執行了SimilarityMatrixRowWrapperMapper.class和UserVectorSplitterMapper.class作為reduce的輸入,reduce階段執行了ToVectorAndPrefReducer.class類。

3.1 SimilarityMatrixRowWrapperMapper.class

此map作用是將similarity matrix每一行對映VectorOrPrefWritable格式。此map輸入格式為(itemid,VectorWritable<itemid,similarscore>),輸出格式為(itemid,VectorOrPrefWritable)。VectorOrPrefWritable類包含三個成員分別是:vector,long userid,float value。在此map階段只是書寫了VectorOrPrefWritable類的vector。

3.2 UserVectorSplitterMapper.class

此map作用是使用者向量轉化為VectorOrPrefWritable物件表示形式。此map輸入為1.2作業結果即tempDir/preparePreferenceMatrix/userVectors檔案,輸出格式為(itemid,VectorOrPrefWritable)。此map階段只書寫了VectorOrPrefWritable類的userid和value。

3.3 ToVectorAndPrefReducer

這是partialMultiply作業的reduce階段,此階段的輸入是3.1 map階段和3.2 map階段的輸出結果,格式為(itemid,VectorOrPrefWritable),輸出格式為(itemid,VectorAndPrefsWritable)。此階段的工作方式為,將3.1 map和3.2 map傳來的資料,分別寫入VectorAndPrefsWritable中的List<Long> userIDs,List<Float> prefValues,Vector similarityMatrixColumn。

此作業的結果儲存在tempDir/partialMultiply檔案中

4、itemFiltering作業

當用戶設定了filterFile檔案時,此作業才會執行。此作業的作用是過濾掉檔案中設定的item。

5、aggregateAndRecommend作業

此作業的map類為PartialMultiplyMapper,reduce類為AggregateAndRecommendReducer。

此作業的作用是提取出推薦結果。最後的推薦結果會儲存到設定的output目錄下。

當沒有設定filterFile檔案時,此作業的輸入為3.3作業的輸出檔案tempDir/partialMultiply格式為(itemid,VectorAndPrefsWritable),輸出格式為(userid,RecommendedItemsWritable)

此作業工作方式為:map階段對映每個使用者的preferences值和對應的item相似向量,即表示形式為(userIDWritable, PrefAndSimilarityColumnWritable),PrefAndSimilarityColumnWritable類裡面記錄了float prefValue和Vector similarityColumn。

reduce階段首先判斷booleanData引數,如果是true,執行reduceBooleanData,如果是false,執行reduceNonBooleanData,一般預設為false。reduce階段計算方式如下所示:

/**
 * <p>computes prediction values for each user</p>
 *
 * <pre>
 * u = a user
 * i = an item not yet rated by u
 * N = all items similar to i (where similarity is usually computed by pairwisely comparing the item-vectors
 * of the user-item matrix)
 *
 * Prediction(u,i) = sum(all n from N: similarity(i,n) * rating(u,n)) / sum(all n from N: abs(similarity(i,n)))
 * </pre>
 */
AggregateAndRecommendReducer類reduce計算相似度程式碼如下:
@Override
  protected void reduce(VarLongWritable userID,
                        Iterable<PrefAndSimilarityColumnWritable> values,
                        Context context) throws IOException, InterruptedException {
    if (booleanData) {
      reduceBooleanData(userID, values, context);
    } else {
      reduceNonBooleanData(userID, values, context);
    }
  }

  private void reduceBooleanData(VarLongWritable userID,
                                 Iterable<PrefAndSimilarityColumnWritable> values,
                                 Context context) throws IOException, InterruptedException {
    /* having boolean data, each estimated preference can only be 1,
     * however we can't use this to rank the recommended items,
     * so we use the sum of similarities for that. */
    Iterator<PrefAndSimilarityColumnWritable> columns = values.iterator();
    Vector predictions = columns.next().getSimilarityColumn();
    while (columns.hasNext()) {
      predictions.assign(columns.next().getSimilarityColumn(), Functions.PLUS);
    }
    writeRecommendedItems(userID, predictions, context);
  }

  private void reduceNonBooleanData(VarLongWritable userID,
                        Iterable<PrefAndSimilarityColumnWritable> values,
                        Context context) throws IOException, InterruptedException {
    /* each entry here is the sum in the numerator of the prediction formula */
    Vector numerators = null;
    /* each entry here is the sum in the denominator of the prediction formula */
    Vector denominators = null;
    /* each entry here is the number of similar items used in the prediction formula */
    Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);

    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
      Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
      float prefValue = prefAndSimilarityColumn.getPrefValue();
      /* count the number of items used for each prediction */
      for (Element e : simColumn.nonZeroes()) {
        int itemIDIndex = e.index();
        numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
      }

      if (denominators == null) {
        denominators = simColumn.clone();
      } else {
        denominators.assign(simColumn, Functions.PLUS_ABS);
      }

      if (numerators == null) {
        numerators = simColumn.clone();
        if (prefValue != BOOLEAN_PREF_VALUE) {
          numerators.assign(Functions.MULT, prefValue);
        }
      } else {
        if (prefValue != BOOLEAN_PREF_VALUE) {
          simColumn.assign(Functions.MULT, prefValue);
        }
        numerators.assign(simColumn, Functions.PLUS);
      }

    }

    if (numerators == null) {
      return;
    }

    Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (Element element : numerators.nonZeroes()) {
      int itemIDIndex = element.index();
      /* preference estimations must be based on at least 2 datapoints */
      if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
        /* compute normalized prediction */
        double prediction = element.get() / denominators.getQuick(itemIDIndex);
        recommendationVector.setQuick(itemIDIndex, prediction);
      }
    }
    writeRecommendedItems(userID, recommendationVector, context);
  }

  /**
   * find the top entries in recommendationVector, map them to the real itemIDs and write back the result
   */
  private void writeRecommendedItems(VarLongWritable userID, Vector recommendationVector, Context context)
    throws IOException, InterruptedException {

    TopItemsQueue topKItems = new TopItemsQueue(recommendationsPerUser);

    for (Element element : recommendationVector.nonZeroes()) {
      int index = element.index();
      long itemID;
      if (indexItemIDMap != null && !indexItemIDMap.isEmpty()) {
        itemID = indexItemIDMap.get(index);
      } else { //we don't have any mappings, so just use the original
        itemID = index;
      }
      if (itemsToRecommendFor == null || itemsToRecommendFor.contains(itemID)) {
        float value = (float) element.get();
        if (!Float.isNaN(value)) {

          MutableRecommendedItem topItem = topKItems.top();
          if (value > topItem.getValue()) {
            topItem.set(itemID, value);
            topKItems.updateTop();
          }
        }
      }
    }

    List<RecommendedItem> topItems = topKItems.getTopItems();
    if (!topItems.isEmpty()) {
      recommendedItems.set(topItems);
      context.write(userID, recommendedItems);
    }
  }