1. 程式人生 > >推薦系統實踐 0x06 基於鄰域的演算法(1)

推薦系統實踐 0x06 基於鄰域的演算法(1)

# 基於鄰域的演算法(1) 基於鄰域的演算法主要分為兩類,一類是基於使用者的協同過濾演算法,另一類是基於物品的協同過濾演算法。我們首先介紹基於使用者的協同過濾演算法。 ## 基於使用者的協同過濾演算法(UserCF) 基於使用者的協同過濾演算法是最古老的演算法了,它標誌著推薦系統的誕生。當一個使用者甲需要個性化推薦時,首先找到那些跟他興趣相似的使用者,然後把那些使用者喜歡的,甲沒有聽說過的物品推薦給使用者甲,那麼這種方式就叫做基於使用者的協同過濾演算法。 那麼,這個演算法包含兩個步驟: 1. 找到和目標使用者興趣相似的使用者集合。 2. 找到這個集合中的使用者喜歡的,且目標使用者沒有聽說過的物品推薦給目標使用者。 我們用使用者行為的相似度來表示興趣的相似度。對於使用者$u$和使用者$v$,$N(u)$和$N(v)$表示各自有過正反饋的物品集合。那麼我們用Jaccard公式表示使用者$u$和使用者$v$之間的興趣相似度。 $$ w_{uv}=\frac{|N(u)\cap N(v)|}{|N(u)\cup N(v)|} $$ 另外也可以通過餘弦相似度進行計算 $$ w_{uv}=\frac{|N(u)\cap N(v)|}{\sqrt{|N(u)||N(v)|}} $$ 餘弦相似度的計算程式碼為 ```python def UserSimilarity(train): W = dict() for u in train.keys(): for v in train.keys(): if u == v: continue W[u][v] = len(train[u] & train[v]) W[u][v] /= math.sqrt(len(train[u]) * len(train[v]) * 1.0) return W ``` 如果這樣去計算的話,在使用者非常大的時候會非常耗時,因為很多使用者之間並沒有對相同的物品產生過行為,演算法也把時間浪費在計算使用者興趣相似度上。那麼我們可以對公式分子部分交集不為空的部分。 建立物品到使用者的**倒排表**,對於每個物品都儲存對該物品產生過行為的使用者列表。 ```python def UserSimilarity(train): # build inverse table for item_users item_users = dict() for u, items in train.items(): for i in items.keys(): if i not in item_users: item_users[i] = set() item_users[i].add(u) #calculate co-rated items between users C = dict() N = dict() for i, users in item_users.items(): for u in users: N[u] += 1 for v in users: if u == v: continue C[u][v] += 1 # calculate finial similarity matrix W W = dict() for u, related_users in C.items(): for v, cuv in related_users.items(): W[u][v] = cuv / math.sqrt(N[u] * N[v]) return W ``` 有了其他使用者的對某個物品$i$感興趣的評分,那麼根據相似度可以計算出使用者$u$對物品$i$的感興趣評分為: $$ p(u,i) = \sum_{v\in S(u,K) \cap N(i)}{w_{uv}r_{vi}} $$ 其中$S(u,K)$是與使用者$u$最相似的K個使用者。因為使用的是單一行為的隱反饋資料,所以所有的評分都為1。另外還可以對使用者的相似度進行改進,比如對冷門物品的興趣更能反應他們的興趣相似度。所以可以加上熱門物品相似度的懲罰。 $$ w_{uv}=\frac{\sum_{i\in N(u)\cap N(v)}\frac{1}{\log 1+|N(i)|}}{\sqrt{|N(u)||N(v)|}} $$ 我們用上一篇介紹的MovieLens資料集,以及以前介紹的評測方式來把程式碼串起來,程式碼來自於參考裡面的github,總體程式碼為: ```python import random import math import time from tqdm import tqdm def timmer(func): def wrapper(*args, **kwargs): start_time = time.time() res = func(*args, **kwargs) stop_time = time.time() print('Func %s, run time: %s' % (func.__name__, stop_time - start_time)) return res return wrapper class Dataset(): def __init__(self, fp): # fp: data file path self.data = self.loadData(fp) @timmer def loadData(self, fp): data = [] for l in open(fp): data.append(tuple(map(int, l.strip().split('::')[:2]))) return data @timmer def splitData(self, M, k, seed=1): ''' :params: data, 載入的所有(user, item)資料條目 :params: M, 劃分的數目,最後需要取M折的平均 :params: k, 本次是第幾次劃分,k~[0, M) :params: seed, random的種子數,對於不同的k應設定成一樣的 :return: train, test ''' train, test = [], [] random.seed(seed) for user, item in self.data: # 這裡與書中的不一致,本人認為取M-1較為合理,因randint是左右都覆蓋的 if random.randint(0, M - 1) == k: test.append((user, item)) else: train.append((user, item)) # 處理成字典的形式,user->set(items) def convert_dict(data): data_dict = {} for user, item in data: if user not in data_dict: data_dict[user] = set() data_dict[user].add(item) data_dict = {k: list(data_dict[k]) for k in data_dict} return data_dict return convert_dict(train), convert_dict(test) class Metric(): def __init__(self, train, test, GetRecommendation): ''' :params: train, 訓練資料 :params: test, 測試資料 :params: GetRecommendation, 為某個使用者獲取推薦物品的介面函式 ''' self.train = train self.test = test self.GetRecommendation = GetRecommendation self.recs = self.getRec() # 為test中的每個使用者進行推薦 def getRec(self): recs = {} for user in self.test: rank = self.GetRecommendation(user) recs[user] = rank return recs # 定義精確率指標計算方式 def precision(self): all, hit = 0, 0 for user in self.test: test_items = set(self.test[user]) rank = self.recs[user] for item, score in rank: if item in test_items: hit += 1 all += len(rank) return round(hit / all * 100, 2) # 定義召回率指標計算方式 def recall(self): all, hit = 0, 0 for user in self.test: test_items = set(self.test[user]) rank = self.recs[user] for item, score in rank: if item in test_items: hit += 1 all += len(test_items) return round(hit / all * 100, 2) # 定義覆蓋率指標計算方式 def coverage(self): all_item, recom_item = set(), set() for user in self.test: for item in self.train[user]: all_item.add(item) rank = self.recs[user] for item, score in rank: recom_item.add(item) return round(len(recom_item) / len(all_item) * 100, 2) # 定義新穎度指標計算方式 def popularity(self): # 計算物品的流行度 item_pop = {} for user in self.train: for item in self.train[user]: if item not in item_pop: item_pop[item] = 0 item_pop[item] += 1 num, pop = 0, 0 for user in self.test: rank = self.recs[user] for item, score in rank: # 取對數,防止因長尾問題帶來的被流行物品所主導 pop += math.log(1 + item_pop[item]) num += 1 return round(pop / num, 6) def eval(self): metric = { 'Precision': self.precision(), 'Recall': self.recall(), 'Coverage': self.coverage(), 'Popularity': self.popularity() } print('Metric:', metric) return metric # 1. 隨機推薦 def Random(train, K, N): ''' :params: train, 訓練資料集 :params: K, 可忽略 :params: N, 超引數,設定取TopN推薦物品數目 :return: GetRecommendation,推薦介面函式 ''' items = {} for user in train: for item in train[user]: items[item] = 1 def GetRecommendation(user): # 隨機推薦N個未見過的 user_items = set(train[user]) rec_items = {k: items[k] for k in items if k not in user_items} rec_items = list(rec_items.items()) random.shuffle(rec_items) return rec_items[:N] return GetRecommendation # 2. 熱門推薦 def MostPopular(train, K, N): ''' :params: train, 訓練資料集 :params: K, 可忽略 :params: N, 超引數,設定取TopN推薦物品數目 :return: GetRecommendation, 推薦介面函式 ''' items = {} for user in train: for item in train[user]: if item not in items: items[item] = 0 items[item] += 1 def GetRecommendation(user): # 隨機推薦N個沒見過的最熱門的 user_items = set(train[user]) rec_items = {k: items[k] for k in items if k not in user_items} rec_items = list( sorted(rec_items.items(), key=lambda x: x[1], reverse=True)) return rec_items[:N] return GetRecommendation # 3. 基於使用者餘弦相似度的推薦 def UserCF(train, K, N): ''' :params: train, 訓練資料集 :params: K, 超引數,設定取TopK相似使用者數目 :params: N, 超引數,設定取TopN推薦物品數目 :return: GetRecommendation, 推薦介面函式 ''' # 計算item->user的倒排索引 item_users = {} for user in train: for item in train[user]: if item not in item_users: item_users[item] = [] item_users[item].append(user) # 計算使用者相似度矩陣 sim = {} num = {} for item in item_users: users = item_users[item] for i in range(len(users)): u = users[i] if u not in num: num[u] = 0 num[u] += 1 if u not in sim: sim[u] = {} for j in range(len(users)): if j == i: continue v = users[j] if v not in sim[u]: sim[u][v] = 0 sim[u][v] += 1 for u in sim: for v in sim[u]: sim[u][v] /= math.sqrt(num[u] * num[v]) # 按照相似度排序 sorted_user_sim = {k: list(sorted(v.items(), \ key=lambda x: x[1], reverse=True)) \ for k, v in sim.items()} # 獲取介面函式 def GetRecommendation(user): items = {} seen_items = set(train[user]) for u, _ in sorted_user_sim[user][:K]: for item in train[u]: # 要去掉使用者見過的 if item not in seen_items: if item not in items: items[item] = 0 items[item] += sim[user][u] recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N] return recs return GetRecommendation # 4. 基於改進的使用者餘弦相似度的推薦 def UserIIF(train, K, N): ''' :params: train, 訓練資料集 :params: K, 超引數,設定取TopK相似使用者數目 :params: N, 超引數,設定取TopN推薦物品數目 :return: GetRecommendation, 推薦介面函式 ''' # 計算item->user的倒排索引 item_users = {} for user in train: for item in train[user]: if item not in item_users: item_users[item] = [] item_users[item].append(user) # 計算使用者相似度矩陣 sim = {} num = {} for item in item_users: users = item_users[item] for i in range(len(users)): u = users[i] if u not in num: num[u] = 0 num[u] += 1 if u not in sim: sim[u] = {} for j in range(len(users)): if j == i: continue v = users[j] if v not in sim[u]: sim[u][v] = 0 # 相比UserCF,主要是改進了這裡 sim[u][v] += 1 / math.log(1 + len(users)) for u in sim: for v in sim[u]: sim[u][v] /= math.sqrt(num[u] * num[v]) # 按照相似度排序 sorted_user_sim = {k: list(sorted(v.items(), \ key=lambda x: x[1], reverse=True)) \ for k, v in sim.items()} # 獲取介面函式 def GetRecommendation(user): items = {} seen_items = set(train[user]) for u, _ in sorted_user_sim[user][:K]: for item in train[u]: # 要去掉使用者見過的 if item not in seen_items: if item not in items: items[item] = 0 items[item] += sim[user][u] recs = list(sorted(items.items(), key=lambda x: x[1], reverse=True))[:N] return recs return GetRecommendation class Experiment(): def __init__(self, M, K, N, fp='./ml-1m/ratings.dat', rt='UserCF'): ''' :params: M, 進行多少次實驗 :params: K, TopK相似使用者的個數 :params: N, TopN推薦物品的個數 :params: fp, 資料檔案路徑 :params: rt, 推薦演算法型別 ''' self.M = M self.K = K self.N = N self.fp = fp self.rt = rt self.alg = {'Random': Random, 'MostPopular': MostPopular, \ 'UserCF': UserCF, 'UserIIF': UserIIF} # 定義單次實驗 @timmer def worker(self, train, test): ''' :params: train, 訓練資料集 :params: test, 測試資料集 :return: 各指標的值 ''' getRecommendation = self.alg[self.rt](train, self.K, self.N) metric = Metric(train, test, getRecommendation) return metric.eval() # 多次實驗取平均 @timmer def run(self): metrics = {'Precision': 0, 'Recall': 0, 'Coverage': 0, 'Popularity': 0} dataset = Dataset(self.fp) for ii in range(self.M): train, test = dataset.splitData(self.M, ii) print('Experiment {}:'.format(ii)) metric = self.worker(train, test) metrics = {k: metrics[k] + metric[k] for k in metrics} metrics = {k: metrics[k] / self.M for k in metrics} print('Average Result (M={}, K={}, N={}): {}'.format(\ self.M, self.K, self.N, metrics)) # 1. random實驗 M, N = 8, 10 K = 0 # 為保持一致而設定,隨便填一個值 random_exp = Experiment(M, K, N, rt='Random') random_exp.run() # 2. MostPopular實驗 M, N = 8, 10 K = 0 # 為保持一致而設定,隨便填一個值 mp_exp = Experiment(M, K, N, rt='MostPopular') mp_exp.run() # 3. UserCF實驗 M, N = 8, 10 for K in [5, 10, 20, 40, 80, 160]: cf_exp = Experiment(M, K, N, rt='UserCF') cf_exp.run() # 4. UserIIF實驗 M, N = 8, 10 K = 80 # 與書中保持一致 iif_exp = Experiment(M, K, N, rt='UserIIF') iif_exp.run() ```` ## 參考 [推薦系統程式碼實現](https://github.com/Magic-Bubble/RecommendSystemP