使用者推薦Slope One演算法與mapreduce&hive實現
使用者推薦越來越熱, Google使用MinHash, PLSI, LDA, SVD, SVM等演算法,分析使用者的喜好, 實現新聞的自動分類;新浪也用Slope One以及一些Item-based的演算法對音樂進行推薦; 淘寶定期會啟動MapReduce作業分析前一天或者一個月使用者收藏的寶貝,給相同喜好的買家提供推薦服務。
本文要描述的Slope One 演算法是一種對評分進行預測的演算法, 它相對於SVD, PLSI, LDA這一類model-based演算法來說有以下特點:
1. 簡單, 容易實現
2. 訓練得到的模型可以增量更新
3. 預測速度很快
4. 使用者可以只做過一兩次評分,就可以獲得推薦.
5. 準確度比較理想
okay, 找到一篇介紹演算法的:http://www.fuchaoqun.com/2008/09/slope_one/
講的不錯,就不再重複了。
英文wiki上也有介紹http://en.wikipedia.org/wiki/Slope_One
其中python的實現比較簡潔
# Copyright 2006 Bryan O'Sullivan <[email protected]>. # # This software may be used and distributed according to the terms # of the GNU General Public License, version 2 or later, which is # incorporated herein by reference. class SlopeOne(object): def __init__(self): self.diffs = {} self.freqs = {} def predict(self, userprefs): preds, freqs = {}, {} for item, rating in userprefs.iteritems(): for diffitem, diffratings in self.diffs.iteritems(): try: freq = self.freqs[diffitem][item] except KeyError: continue preds.setdefault(diffitem, 0.0) freqs.setdefault(diffitem, 0) preds[diffitem] += freq * (diffratings[item] + rating) freqs[diffitem] += freq return dict([(item, value / freqs[item]) for item, value in preds.iteritems() if item not in userprefs and freqs[item] > 0]) def update(self, userdata): for ratings in userdata.itervalues(): for item1, rating1 in ratings.iteritems(): self.freqs.setdefault(item1, {}) self.diffs.setdefault(item1, {}) for item2, rating2 in ratings.iteritems(): self.freqs[item1].setdefault(item2, 0) self.diffs[item1].setdefault(item2, 0.0) self.freqs[item1][item2] += 1 self.diffs[item1][item2] += rating1 - rating2 print self.diffs[item1][item2] for item1, ratings in self.diffs.iteritems(): for item2 in ratings: ratings[item2] /= self.freqs[item1][item2] if __name__ == '__main__': userdata = dict( alice=dict(squid=1.0, cuttlefish=0.5, octopus=0.2), bob=dict(squid=1.0, octopus=0.5, nautilus=0.2), carole=dict(squid=0.2, octopus=1.0, cuttlefish=0.4, nautilus=0.4), dave=dict(cuttlefish=0.9, octopus=0.4, nautilus=0.5), ) s = SlopeOne() s.update(userdata) print s.predict(dict(octopus=0.4)
現在分析一下Slope One訓練的空間及時間複雜度,
如果有m個使用者,分別對n件物品進行了評分。每個使用者得進行 n 2 次計算,將產生n(n-1)/2級別的資料量(由於diff是個對角矩陣,可以只取下三角)。所以對m個使用者來說, CPU計算時間是m n 2 , 產生的中間資料是mn(n-1)/2,最後合併m個使用者的這些資料,產生的資料量是n(n-1)/2。
這個演算法的計算量對物品資料是呈平方級別地增長,對使用者數量是線性的。比較恐怖的是它產生的中間資料,如果某使用者物品評價資料為1MB左右, 且資料是double型佔8位元組, 則有1MB / 8B = 128K,此使用者將產生的資料是1MB * (128K - 1) / 2 約為64GB資料量, 這部分中間資料是不可能放在記憶體的,只能通過磁碟,然而磁碟讀寫與主存完全不是一個級別,速度上又造成一個瓶頸。
當然也不必這麼悲觀, Slope One是一個可以進行增量的演算法。假設已經對y件物品進行了訓練,則當前訓練的時間複雜度不會超過n 2 +my 2 . 撇開增量演算法不管, 我們可以用MapReduce的優勢分散式地進行訓練(可以同時使用增量和MapReduce)。以Netflix Prize 的資料為例, 它包含480189個使用者對17770部影片的評分資料,訓練資料有17770個檔案,每個檔案代表一部影片, 其中第一行是影片的id, 其餘行是各使用者對此影片的評分記錄。
MovieID:
CustomerID,Rating,Date
這些檔案都比較小,最大的不過4793673位元組,最小的才70位元組,而MapReduce的檔案塊為64MB。小檔案對於mapreduce任務來說是不利的,將會產生太多mapper. 這兒有一種解決辦法,將tar包轉成sequecefile .
省略此步,直接把解壓後的檔案put到HDFS,然後使用一道mapreduce把資料轉成我們需要的格式。
hadoop dfs -put $NETFLIX_HOME/training_set /user/zhoumin/netflix-source # 將附件中的程式碼成slopeone-0.00.1-dev.jar後執行 hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOnePreproccessor /user/zhoumin/netflix-source/user/zhoumin/netflix
然後用SlopeOneTrainer進行訓練。
SlopeOneTrainer的原理每個mapper計算一個使用者各item的diff矩陣。瞭解hadoop中mapper執行機制的人就會發現,有的使用者資料量大,很有可能產生上面說的數十GB的中間資料, 遠遠超過io.sort.mb的值。會造成mapper不停地merge資料,致使速度較慢, 使用36臺個slaves的叢集執行netflix的這部分訓練花了4個多小時,絕大部分時間會花在mapper之上,特別是mapper的merge階段.
於是假如把中間資料交給reducer去處理,更為理想,其實此步訓練相當於一個join操作。於是使用hive比較方便。先將原始資料轉成hive所需要的格式.
hadoop jar build/slopeone-0.00.1-dev.jar redpoll.cf.slopeone.SlopeOneHive /user/zhoumin/netflix-source /user/zhoumin/netflix-hive
然後再建立兩張表,netflix是處理後的netflix訓練資料, freq_diff是訓練後的模型矩陣
CREATE EXTERNAL TABLE netflix(
movie_id STRING,
user_id STRING,
rate DOUBLE,
rate_date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/zhoumin/netflix-hive';
CREATE TABLE freq_diff (
movie_id1 STRING,
movie_id2 STRING,
freq DOUBLE,
diff DOUBLE
);
okay,執行訓練SQL
INSERT OVERWRITE TABLE freq_diff
SELECT
nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
FROM
netflix nf1
JOIN
netflix nf2 ON nf1.user_id = nf2.user_id
WHERE nf1.movie_id > nf2.movie_id
GROUP BY nf1.movie_id, nf2.movie_id;
此SQL將會產生兩道mapreduce job,使用 explain命令即可以看到, 第一道主要做join的工作,在reduce端會輸
出所有的中間資料。Hive自動會調整reducer的數量,但這兒的reducer為3, 跑得比較慢(超過9小時),可以將reducer顯式地設大些,我這兒設為160,再跑上面的訓練SQL.
set mapred.reduce.tasks=160;
兩道job第一道花了33mins, 35sec,第二道花了1hrs, 29mins, 29sec,訓練時間總共約2小時,可以接受。
訓練完畢,就可以試一試預測功能了。假設某使用者給影片1000評了2分,那麼他將會對其它影片評多少分呢? 他將喜歡哪些影片呢?
okay,先做些準備工作
CREATE TABLE predict( movie_id STRING, rate FLOAT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; echo "1000,2" > predict_data LOAD DATA LOCAL INPATH './predict_data' OVERWRITE INTO TABLE predict;
然後就可以進行預測了:
CREATE TABLE slopeone_result(
movie_id STRING,
freq DOUBLE,
pref DOUBLE,
rate DOUBLE
);
INSERT OVERWRITE TABLE slopeone_result
SELECT
/*+ MAPJOIN(p) */
movie_id1 as movie_id,
sum(freq) as freq,
sum(freq*(diff + rate)) as pref,
sum(freq*(diff + rate))/sum(freq) as rate
FROM
predict p
JOIN freq_diff fd ON fd.movie_id2 = p.movie_id
GROUP BY movie_id1
注意上面使用了一個Map-Side Join的hint, 因為predict表非常小,只需要跑一個map only的job就可以完成join,無需shuffle資料給reduce. 這一步把使用者自身的movie_id也參與計算,由於hive不支援in,所以結果有些偏差。可以用一道MapReduce作業來做預測這一步。
最後select .. order by一下就知道此使用者喜歡哪些影片了。
結論:
1. 使用mapreduce,將運算移至reduce端, 避免map端的merge可以有效地提高訓練速度
2. Slope One是一種簡單易實現的使用者推薦演算法,而且可以增量訓練
3. 結合以上兩點,加上BigTable, HyperTable, Voldermort, Cassendera這種分散式key-value儲存庫,完全可以做到實時使用者推薦(HBase甭提了)。
-----------------------------------------------------------------------------------------------------
附: hive生成的mr job描述.
hive> explain
> INSERT OVERWRITE TABLE freq_diff
> SELECT
> nf1.movie_id, nf2.movie_id, count(1), sum(nf1.rate - nf2.rate)/count(1)
> FROM
> netflix nf1
> JOIN
> netflix nf2 ON nf1.user_id = nf2.user_id
> WHERE nf1.movie_id > nf2.movie_id
> GROUP BY nf1.movie_id, nf2.movie_id;
OK
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF netflix nf1) (TOK_TABREF netflix nf2) (= (. (TOK_TABLE_OR_COL nf1) user_id) (. (TOK_TABLE_OR_COL nf2) user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB freq_diff)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf1) movie_id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL nf2) movie_id)) (TOK_SELEXPR (TOK_FUNCTION count 1)) (TOK_SELEXPR (/ (TOK_FUNCTION sum (- (. (TOK_TABLE_OR_COL nf1) rate) (. (TOK_TABLE_OR_COL nf2) rate))) (TOK_FUNCTION count 1)))) (TOK_WHERE (> (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL nf1) movie_id) (. (TOK_TABLE_OR_COL nf2) movie_id))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
nf2
TableScan
alias: nf2
Reduce Output Operator
key expressions:
expr: user_id
type: string
sort order: +
Map-reduce partition columns:
expr: user_id
type: string
tag: 1
value expressions:
expr: movie_id
type: string
expr: rate
type: double
nf1
TableScan
alias: nf1
Reduce Output Operator
key expressions:
expr: user_id
type: string
sort order: +
Map-reduce partition columns:
expr: user_id
type: string
tag: 0
value expressions:
expr: movie_id
type: string
expr: rate
type: double
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col2}
1 {VALUE._col0} {VALUE._col2}
outputColumnNames: _col0, _col2, _col4, _col6
Filter Operator
predicate:
expr: (_col0 > _col4)
type: boolean
Select Operator
expressions:
expr: _col0
type: string
expr: _col4
type: string
expr: _col2
type: double
expr: _col6
type: double
outputColumnNames: _col0, _col4, _col2, _col6
Group By Operator
aggregations:
expr: count(1)
expr: sum((_col2 - _col6))
keys:
expr: _col0
type: string
expr: _col4
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
hdfs://xxx:9000/user/zhoumin/hive-tmp/22895032/10002
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: string
tag: -1
value expressions:
expr: _col2
type: bigint
expr: _col3
type: double
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: sum(VALUE._col1)
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: string
mode: mergepartial
outputColumnNames: _col0, _col1, _col2, _col3
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: bigint
expr: (_col3 / _col2)
type: double
outputColumnNames: _col0, _col1, _col2, _col3
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: UDFToDouble(_col2)
type: double
expr: _col3
type: double
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: true
GlobalTableId: 1
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: freq_diff
Stage: Stage-0
Move Operator
tables:
replace: true
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: freq_diff