【Python】 大規模電影推薦
簡介
推薦系統把我們從洪水般氾濫的資訊中解放出來,為我們制定了個性化的資訊流。網易雲音樂、電子商務平臺等都從推薦系統中獲益頗多。推薦系統的實現是如此簡單,但是在資料量稀疏師很容易產生怪異的結果和過擬合。 最簡單最容易理解的方法就是看一下所信賴的人有哪些偏好,從中得到我們的推薦。協同過濾便是這一類推薦系統技術的基礎。簡單來說,他是基於這樣一個假設:那些和你有共同偏好的人將來也會和你擁有共同偏好;這是從人的角度來看。另一個推論是基於物品的角度——那些被同一個人所喜愛的物品很有可能出現在另一個人喜愛的物品中。這就是我們常說的基於人的協同過濾以及基於物品的協同過濾。對偏好建模
舉個簡單例子:兩個人A和B,他們對同一個物品有著共同的偏好。如果A對另外一件物品如遊戲機感興趣,那麼和任意選一個物品相比,B更有可能也對遊戲機感興趣。而我們相信,A和B的共同偏好可以從他們大量的已有偏好中發現。通過協同過濾的分組特性,我們可以對這個世界的物品進行過濾。
最長用的偏好表達模型把排名的問題簡化成對偏好進行數值化的過程。比如:
- 布林值(是或者否)
- 頂和踩(比如反對、討厭)
- 加權資訊(點選數或者動作數)
- 廣泛的分類排名(星數從討厭到喜愛)。
這些方法都是為了能夠對個體的偏好情況進行數值化,以便後面模型的建立。
資料的載入
2.解壓資料到工作路徑。
3.為了瞭解包括使用者評分的u.data和電影詳細資訊的u.item連個檔案。利用windows下的more命令來觀察資料。(Mac和Linux下的head命令)
more u.item 2
more u.data
對於u.data,第一列為使用者ID,第二列為電影ID,第三列為評分,第四列為時間戳;u.item文中包括電影ID、標題、上映時間和IMDB連結。此外,檔案中還用一個布林值組標識的每部電影的型別,包括動作、探險、動畫、兒童、喜劇、犯罪、記錄、虛幻、黑絲、恐嚇、音樂、推理、浪漫、科幻、驚悚、戰爭和西部。
OK,下面正式進入正題~~~
1.將電影資料匯入
#encoding:utf-8 import os import csv import heapq from operator import itemgetter from datetime import datetime from collections import defaultdict def load_reviews(path, **kwargs): ''' 載入電影資料檔案 ''' options = { 'fieldnames': ('userid', 'movieid', 'rating', 'timestamp'), 'delimiter' : '\t' } options.update(kwargs) parse_date = lambda r, k: datetime.fromtimestamp(float(r[k])) parse_int = lambda r, k: int(r[k]) with open(path, 'rb') as reviews: reader = csv.DictReader(reviews, **options) for row in reader: row['movieid'] = parse_int(row, 'movieid') row['userid'] = parse_int(row, 'userid') row['rating'] = parse_int(row, 'rating') row['timestamp'] = parse_date(row, 'timestamp') yield row
2.建立一個輔助函式來輔助函式匯入:
def relative_path(path):
'''
輔助資料匯入
'''
dirname = os.path.dirname(os.path.realpath('__file__'))
path = os.path.join(dirname, path)
return os.path.normpath(path)
3.匯入電影資訊
def load_movies(path, **kwargs):
'''
讀取電影資訊
'''
options = {
'fieldnames': ('movieid', 'title', 'release', 'video', 'url'),
'delimiter' : '|',
'restkey' : 'genre'
}
options.update(**kwargs)
parse_int = lambda r,k: int(r[k])
parse_date = lambda r,k: datetime.strptime(r[k], '%d-%b-%Y') if r[k] else None
with open(path, 'rb') as movies:
reader = csv.DictReader(movies, **options)
for row in reader:
row['movieid'] = parse_int(row, 'movieid')
#print row['movieid']
row['release'] = parse_date(row, 'release')
#print row['release']
#print row['video']
yield row
4.建立一個類,在之後的分析中將會反覆用到
class MovieLens(object):
def __init__(self, udata, uitem):
self.udata = udata
self.uitem = uitem
self.movies = {}
self.reviews = defaultdict(dict)
self.load_dataset()
def load_dataset(self):
#載入資料到記憶體中,按ID為索引
for movie in load_movies(self.uitem):
self.movies[movie['movieid']] = movie
for review in load_reviews(self.udata):
self.reviews[review['userid']][review['movieid']] = review
#print self.reviews[review['userid']][review['movieid']]
5.測試
輸入以下程式碼進行測試。if __name__ == '__main__':
data = relative_path('data/ml-100k/u.data')
item = relative_path('data/ml-100k/u.item')
model = MovieLens(data, item)
尋找高評分電影
函式reviews_for_movie()遍歷所有評分字典中的值(通過userid進行索引),並檢查使用者是否對當前的movieid進行過評分,如存在,則將評分結果返回.
def reviews_for_movie(self, movieid):
for review in self.reviews.values():
if movieid in review: #存在則返回
yield review[movieid]
函式average_reviews(),返回電影ID、平均得分以及評分的個數。
def average_reviews(self):
#對所有的電影求平均水平
for movieid in self.movies:
reviews = list(r['rating'] for r in self.reviews_for_movie(movieid))
average = sum(reviews) / float(len(reviews))
yield (movieid, average, len(reviews)) #返回了(movieid,評分平均分,長度(即評價人數))
toprated函式利用heapq對結果根據平均分進行排序。
def top_rated(self, n=10):
#返回一個前n的top排行
return heapq.nlargest(n, self.bayesian_average(), key=itemgetter(1))
基於貝葉斯的電影評分演算法,由於樸素的貝葉斯平均值演算法無法對那些有較多評分數的電影之間產生有意義的比較。我們需要給每個電影一個統一 的標準分數:
這裡n是預設值,C是我們通過
C = float(sum(num for mid, avg, num in model.average_reviews())) / len(model.movies)得到的,這裡直接給出m為3,C為59。
def bayesian_average(self, c=59, m=3):
#返回一個修正後的貝葉斯平均值
for movieid in self.movies:
reviews = list(r['rating'] for r in self.reviews_for_movie(movieid))
average = ((c * m) + sum(reviews)) / float(c + len(reviews))
yield (movieid, average, len(reviews))
輸出排名前十的電影
if __name__ == '__main__':
data = relative_path('data/ml-100k/u.data')
item = relative_path('data/ml-100k/u.item')
model = MovieLens(data, item)
for mid, avg, num in model.top_rated(10):
title = model.movies[mid]['title']
print "[%0.3f average rating (%i reviews)] %s" % (avg, num,title)
結果如下:計算使用者在偏好空間中的距離
基於使用者的協同過濾以及基於物品的協同過濾是推薦系統中最常用的兩種協同過濾方式。我們把偏好空間想象成一組使用者或物品的N維特徵空間,這樣我們就可以比較使用者或者物品在向量空間中是否鄰近,因此這類協同過濾系統又被稱為最近鄰推薦系統。顯然,構建這類系統最關鍵的一步就是找到一種相似性或者距離的度量標準,我們可以根據這類標準衡量對物品的偏好程度。常見的這類標準有歐式距離、曼哈頓距離、餘弦距離、皮爾遜相關係數等、斯皮爾曼相關度等。
下面我們利用歐式距離來構建。
函式share_preferences()將找出兩個使用者A和B共同評分過的電影。
def share_preferences(self, criticA, criticB):
'''
找出兩個評論者之間的交集
'''
if criticA not in self.reviews:
raise KeyError("Couldn't find critic '%s' in data " % criticA)
if criticB not in self.reviews:
raise KeyError("Couldn't find critic '%s' in data " % criticB)
moviesA = set(self.reviews[criticA].keys())
moviesB = set(self.reviews[criticB].keys())
shared = moviesA & moviesB
#建立一個評論過的的字典返回
reviews = {}
for movieid in shared:
reviews[movieid] = (
self.reviews[criticA][movieid]['rating'],
self.reviews[criticB][movieid]['rating'],
)
return reviews
函式euclidean_distance()通過他們的共同電影偏好作為向量來計算兩個使用者之間的歐式距離在這裡我補充一下:
*閔可夫斯基距離,簡稱閔氏距離;
按q值的不同又分成
(1)絕對距離,即曼哈頓距離(q=1)
(2)歐幾里得距離(q=2)
(3)切比雪夫距離(q=無窮)
def euclidean_distance(self, criticA, criticB, prefs='users'):
'''
通過兩個人的共同偏好作為向量來計算兩個使用者之間的歐式距離
'''
#建立兩個使用者的交集
preferences = self.share_preferences(criticA,criticB)
#沒有則返回0
if len(preferences) == 0: return 0
#求偏差的平方的和
sum_of_squares = sum([pow(a-b,2) for a,b in preferences.values()])
#修正的歐式距離,返回值的範圍為[0,1]
return 1 / (1 + sqrt(sum_of_squares))
最後,照例輸入程式碼測試一下。
if __name__ == '__main__':
data = relative_path('data/ml-100k/u.data')
item = relative_path('data/ml-100k/u.item')
model = MovieLens(data, item)
print model.euclidean_distance(631,532) #A,B
結果為0.240253073352。
計算使用者相關性
這部分將利用皮爾遜相關係數來作為度量指標。函式pearson_correlation計算使用者A和使用者B的皮爾遜相關係數。
這裡給出皮爾遜相關係數的計算公式:
其中
經過整理我們具體計算相關係數是,可以用如下公式:
def pearson_correlation(self, criticA, criticB, prefs='users'):
'''
返回兩個評論者之間的皮爾遜相關係數
'''
if prefs == 'users':
preferences = self.share_preferences(criticA, criticB)
elif prefs == 'movies':
preferences = self.shared_critics(criticA, criticB)
else:
raise Exception("No preferences of type '%s'." % prefs)
length = len(preferences)
if length == 0 :return 0
#迴圈處理每一個評論者之間的皮爾遜相關係數
sumA = sumB = sumSquareA = sumSquareB = sumProducts = 0
for a, b in preferences.values():
sumA += a
sumB += b
sumSquareA += pow(a, 2)
sumSquareB += pow(b, 2)
sumProducts += a * b
#計算皮爾遜係數
numerator = (sumProducts * length) - (sumA * sumB)
denominator = sqrt(((sumSquareA*length) - pow(sumA,2)) * ((sumSquareB*length) - pow(sumB,2)))
if denominator == 0:return 0
return abs(numerator/denominator)
同理,用如下程式碼測試一下:
if __name__ == '__main__':
data = relative_path('data/ml-100k/u.data')
item = relative_path('data/ml-100k/u.item')
model = MovieLens(data, item)
print model.pearson_correlation(232,532)
結果為0.062025793538385047
為特定使用者尋找最好的影評人
在已經有兩種不同的衡量指標來計算兩個使用者之間的相似程度,接下來我們為一個特定使用者尋找最適合他的影片人,看一下兩者在洗好空間上的相似程度。 函式similar_critics()來尋找最匹配的使用者。 def similar_critics(self,user, metric='euclidean', n=None):
'''
為特定使用者尋找一個合適的影評人
'''
metrics = {
'euclidean': self.euclidean_distance,
'pearson': self.pearson_correlation
}
distance = metrics.get(metric, None)
#解決可能出現的狀況
if user not in self.reviews:
raise KeyError("Unknown user, '%s'." % user)
if not distance or not callable(distance):
raise KeyError("Unknown or unprogrammed distance metric '%s'." % metric)
#計算對使用者最合適的影評人
critics = {}
for critic in self.reviews:
#不能與自己進行比較
if critic == user:
continue
critics[critic] = distance(user,critic)
if n:
return heapq.nlargest(n, critics.items(), key=itemgetter(1))
return critics
下面利用如下程式碼分別測試一下兩種度量指標的結果:
for item in model.similar_critics(232, 'pearson', n=10):
print "%4i: %0.3f" % item
利用pearson相關係數的結果為,
for item in model.similar_critics(232, 'euclidean', n=10):
print "%4i: %0.3f" % item
利用歐式距離的結果為,
結論
皮爾遜係數會比歐式距離找到更多的相似使用者。歐式距離更傾向於那些評分完全一致的使用者,而皮爾遜相關性更傾向於線性相關使用者的相似性,因此能糾正分數膨脹現象:兩個一個使用者總是比另一個使用者評分高一星; 因此僅僅用那些相似使用者的評分無法預測一個使用者對一個新電影的評分,我們必須通過所有使用者的打分情況才能對使用者的評分做出預測。預測使用者評分
為了預測一個電影的評分,我們需要計算評論過這個電影的使用者的評分相對當前使用者的加權平均值。權重為那些評論過分的使用者和當前使用者的相似程度,很顯然,我們認為和當前使用者相似程度越高的使用者的評分應被給予更大的權重。
predict_ranking函式基於其他使用者的評分預測當前使用者對電影可能的評分。 def predict_ranking(self, user,movie, metric='euclidean', critics=None):
'''
預測一個使用者對一部電影的評分,相當於評論過這部電影的使用者對當前使用者的加權均值
並且權重取決與其他使用者和該使用者的相似程度
'''
critics = critics or self.similar_critics(user,metric=metric)
total = 0.0
simsum = 0.0
for critic, similarity in critics.items():
if movie in self.reviews[critic]:
total += similarity * self.reviews[critic][movie]['rating']
simsum += similarity
if simsum == 0.0 :return 0.0
return total / simsum
接下predict_all_rankings函式來就可以預測所有電影的評分。
def predict_all_rankings(self,user,metric='euclidean', n=None):
'''
為所有的電影預測評分,返回前n個評分的電影和它們的評分
'''
critics = self.similar_critics(user, metric=metric)
movies = {
movie:self.predict_ranking(user, movie, metric, critics)
for movie in self.movies
}
if n:
return heapq.nlargest(n, movies.items(), key=itemgetter(1))
return movies
同理,接下來輸入以下程式碼進行測試:
print model.predict_ranking(422, 50,'euclidean')
print model.predict_ranking(422,50,'pearson')
結果如下:
predict_all_rankings函式根據傳入的度量指標預測一個特定使用者對所有電影的排名,並接受一個引數n來返回排名前n的電影。
for mid ,rating in model.predict_all_rankings(578,'pearson',10):
print '%0.3f: %s' % (kerating, model.movies[mid]['title'])
結果如下:
基於物品的協同過濾
前文都是基於使用者間的相似度來進行預測,然而相似度空間我們知道可以從兩個角度去探索。以使用者為中心的協同過濾的洗好空間中以使用者為資料點,比較使用者之間的相似程度,並利用相似程度尋找和使用者相似的使用者作為預測的因素;另一種以物品為中心的協同過濾洗好空間中以物品為資料點,推薦系統根據一組物品和另一組物品的相似程度做推薦。
另外,由於物品之間的相似性變化較為緩慢,因此基於物品的協同過濾是一種常用的推薦優化方案。
函式shared_critics類似於函式shared_preferences,不同的是函式shared_preferences將找出兩個使用者A和B共同評分過的電影。而函式shared_critics將找出兩部電影有共同的使用者。函式similar_items與函式similar_critics類似,是為了尋找最合適的電影而不是尋找合適的使用者。
def shared_critics(self, movieA, movieB):
'''
返回兩部電影的交集,即兩部電影在同一個人觀看過的情況
'''
if movieA not in self.movies:
raise KeyError("Couldn't find movie '%s' in data" % movieA)
if movieB not in self.movies:
raise KeyError("Couldn't find movie '%s' in data" % movieB)
criticsA = set(critic for critic in self.reviews if movieA in self.reviews[critic])
criticsB = set(critic for critic in self.reviews if movieB in self.reviews[critic])
shared = criticsA & criticsB #和操作
#建立一個評論過的字典以返回
reviews = {}
for critic in shared:
reviews[critic] = (
self.reviews[critic][movieA]['rating'],
self.reviews[critic][movieB]['rating']
)
return reviews
def similar_items(self, movie, metric='eculidean', n=None):
metrics = {
'euclidean': self.euclidean_distance,
'pearson': self.pearson_correlation,
}
distance = metrics.get(metric, None)
#解決可能出現的狀況
if movie not in self.reviews:
raise KeyError("Unknown movie, '%s'." % movie)
if not distance or not callable(distance):
raise KeyError("Unknown or unprogrammed distance metric '%s'." % metric)
items = {}
for item in self.movies:
if item == movie:
continue
items[item] = distance(item, movie,prefs='movies')
if n:
return heapq.nlargest(n, items.items(), key=itemgetter(1))
return items
同理,輸入以下程式碼進行測試:
for movie, similarity in model.similar_items(631, 'pearson').items():
print '%0.3f : %s' % (similarity, model.movies[movie]['title'])
結果如下:
…
同理按照前面預測使用者評分的思想,基於已經計算好的相似性,我們可以按照下面 的方法進行推薦。
def predict_items_recommendation(self, user, movie, metric='euclidean'):
movie = self.similar_items(movie, metric=metric)
total = 0.0
simsum = 0.0
for relmovie, similarity in movie.items():
if relmovie in self.reviews[user]:
total += similarity * self.reviews[user][relmovie]['rating']
simsum += similarity
if simsum == 0.0:return 0.0
return total / simsum
同理,輸入以下程式碼進行測試:print model.predict_items_recommendation(232, 52, 'pearson')
結果為3.980443976。OK,宿舍馬上熄燈了,暫且寫到這兒吧,後面還有一些內容,再更新吧~~
=======================================4.24更新========================================
建立並訓練SVD模型
演算法原理簡介
由於協同過濾方法或者不能處理非常大的資料集,或者處理不好使用者評論非常少的情況(即我們所說的資料比較稀疏的情況)。矩陣分解方法可以方便地隨著觀測資料進行線性擴充套件。 矩陣分解(SVD)的目的是將原有矩陣拆解為兩個矩陣,通過它們的點技(內積、向量積)和原有矩陣相似。在這裡,我們的訓練矩陣為使用者到電影評分的一個NxM矩陣,使用者沒用評分的電影的值為空或者0。我們希望通過矩陣分解模型能夠以點積填補那些空值,作為使用者對電影評分的預測值。即: 並用下面的公式對使用者u對電影i的評分進行估計:為了實現對P和Q的估計,僅需要對qi和pu進行估計,可以最優化有一下目標函式完成:
式中,K為訓練集當中所有的已知的使用者、電影評分(即觀測到的評分部分)
有很多方法對以上目標函式進行求解,通常我們採用隨機梯度下降法(SGD,網上很多相關的~後面再寫下對SGD的總結吧)求解,通過不斷迭代更新引數和預測值的方法進行引數估計,使得誤差逐步變小。通過這種方法希望能找到一個區域性最優解,使得誤差在可接受的範圍內。
分別對pu和qi進行偏微分,可以得出分別為:和
所以引數更新方向朝梯度相反方向前進一小步:
其中
通常,隨機梯度下降方法同樣可以對改進後的模型進行引數估計,其具體迭代更新公式為:
其中是一個懲罰引數。
矩陣分解演算法對記憶體的利用效率極高,可以並行,支援多特徵向量。並且可以設定不同的置信級別。優點是顯而易見的~
訓練SVD模型
下面附上訓練SVD模型的程式碼:
def factor2(R, P=None, Q=None, K=2, steps=5000, alpha=0.0002, beta=0.02):
"""
依靠給定的引數訓練矩陣R.
:param R: N x M的矩陣,即將要被訓練的
:param P: 一個初始的N x K矩陣
:param Q: 一個初始的M x K矩陣
:param K: 潛在的特徵
:param steps: 最大迭代次數
:param alpha: 梯度下降法的下降率
:param beta: 懲罰引數
:returns: P 和 Q
"""
if not P or not Q:
P, Q = initialize(R, K)
Q = Q.T
rows, cols = R.shape
for step in xrange(steps):
eR = np.dot(P, Q) # 一次性內積即可
for i in xrange(rows):
for j in xrange(cols):
if R[i,j] > 0:
eij = R[i,j] - eR[i,j]
for k in xrange(K):
P[i,k] = P[i,k] + alpha * (2 * eij * Q[k,j] - beta * P[i,k])
Q[k,j] = Q[k,j] + alpha * (2 * eij * P[i,k] - beta * Q[k,j])
eR = np.dot(P, Q) # Compute dot product only once
e = 0
for i in xrange(rows):
for j in xrange(cols):
if R[i,j] > 0:
e = e + pow((R[i,j] - eR[i,j]), 2)
for k in xrange(K):
e = e + (beta/2) * (pow(P[i,k], 2) + pow(Q[k,j], 2))
if e < 0.001:
break
return P, Q.T
匯出SVD模型至硬碟
由於SVD模型訓練需要很長的時間,我們可以先建立一個從硬碟匯入匯出模型的機制。如果可以把矩陣分解的係數進行儲存,並在需要的時候進行服用。這時我們就要利用Python的pickle模組來進行方便地處理了~class Recommender(object):
@classmethod
def load(klass, pickle_path):
'''
接受磁碟上包含pickle序列化後的檔案路徑為引數,並用pickle模組載入檔案。
由於pickle模組在序列化是會儲存匯出時物件的所有屬性和方法,因此反序列
化出來的物件有可能已經和當前最新程式碼中的類不同。
'''
with open(pickle_path, 'rb') as pkl:
return pickle.load(pkl)
def __init__(self, udata):
self.udata = udata
self.users = None
self.movies = None
self.reviews = None
# 描述性工程
self.build_start = None
self.build_finish = None
self.description = None
self.model = None
self.features = 2
self.steps = 5000
self.alpha = 0.0002
self.beta = 0.02
self.load_dataset()
def dump(self,pickle_path):
'''
序列化方法、屬性和資料到硬碟,以便在未來匯入
'''
with open(pickle_path, 'wb' ) as pkl:
pickle.dump(self,pkl)
def load_dataset(self):
'''
載入使用者和電影的索引作為一個NxM的陣列,N是使用者的數量,M是電影的數量;標記這個順序尋找矩陣的價值
'''
self.users = set([])
self.movies = set([])
for review in load_reviews(self.udata):
self.users.add(review['userid'])
self.movies.add(review['movieid'])
self.users = sorted(self.users)
self.movies = sorted(self.movies)
self.reviews = np.zeros(shape=(len(self.users), len(self.movies)))
for review in load_reviews(self.udata):
uid = self.users.index(review['userid'])
mid = self.movies.index(review['movieid'])
self.reviews[uid, mid] = review['rating']
def build(self, output=None):
'''
訓練模型
'''
options = {
'K' : self.features,
'steps' : self.steps,
'alpha' : self.alpha,
'beta' : self.beta
}
self.build_start = time.time()
nnmf = factor2
self.P, self.Q = nnmf(self.reviews, **options)
self.model = np.dot(self.P, self.Q.T)
self.build_finish = time.time()
if output :
self.dump(output)
在做完以上步驟後,就慢慢等著模型訓練吧~
最後,利用資料集測試一下SVD模型,輸入以下程式碼利用模型來訪問預測的評分:
#利用模型來訪問預測的評分
def predict_ranking(self, user, movie):
uidx = self.users.index(user)
midx = self.movies.index(movie)
if self.reviews[uidx, midx] > 0:
return None
return self.model[uidx, midx]
並將電影做一個排名系統:
#預測電影的排名
def top_rated(self, user, n=12):
movies = [(mid, self.predict_ranking(user, mid)) for mid in self.movies]
return heapq.nlargest(n, movies, key=itemgetter(1))
至此,一個完整的電影推薦系統的框架基本搭建完成,後面就是資料集(庫)的更新,以及定期訓練SVD模型了~~