[大資料]ItemBased協同過濾hadoop實現
協同過濾演算法這裡就不再敘述了
隨便說一點,通常我們使用的基於物品和使用者的協同過濾演算法於AndrewNG的機器學習描述的有些不一樣
視訊中虛擬了一個X出來,然後通過梯度下降法不斷的計算X和theta使costFunction(與y關聯)最小。
實際使用中,X這個表示物品的特徵引數已經去掉了,直接使用Y(使用者對物品的評分)來計算物品(使用者)相似度以及做推薦
這裡我們定義 物品uv的相似度為 ,N(u)對物品u有過評分的使用者列表。
定義推薦為使用者i對物品u的興趣度, S(u,K)為與u相似的前K個物品,N(i)為使用者i有過評分的物品
rvi為使用者i對v的評分.
演算法實現不難,但是一旦牽扯到大資料,就出了規模性問題
首先,物品或者使用者某一項肯定會很大,演算法中使用者*物品的矩陣肯定不能直接儲存
eg:100w使用者1w商品則開出來的矩陣有10,000,000,000元素每個元素假定4位元組也有超過32G的記憶體,
因此這裡不能直接把某些項放入記憶體做cache來優化計算,不然隨著問題規模的擴大遲早爆記憶體。
這裡採用空間換時間,利用mapreduce的排序特性做檔案儲存計算了,充分利用key會排序這個特性進行多步mapreduce計算
輸入格式:uid,pid,score的一系列日誌資料。
mapreduce過程分為3步:
1、計算物品之間的相關度
map:in uid,pid,score
map:out <uid> pid score
map把所有的輸入處理為以uid為key的輸出,這樣做的目的是通過shuffle後所有的uid相同的會排在一起,方便reduce計算物品共同出現的次數
核心程式碼:
dict = {}
for line in sys.stdin:
(uid, pid, score) = line.strip('\n').split(',')
print "%s\t%s\t%d" %(uid,pid,score)
reduce:in <uid> pid score
reduce:out pida:pidb:val
reduce讀入map中所有同一個使用者的評分商品後,對這些商品兩兩出現次數+1
最後計算物品相似度。
這裡我們假設物品的數量有限,可以在記憶體中儲存物品到物品共線矩陣,當然
我們只儲存非0的,稀疏矩陣。
def process(listPid,res):
for keyA in listPid:
for keyB in listPid:
res[keyA+':'+keyB] = res.get(keyA+':'+keyB, 0) + 1#這裡keyA=keyB時也計算在內是有意義的,因為res[keyA:keyA]每個使用者計算會+1,所以最後的結果是keyA出現的總次數。既N(i)的值。
def run():
dictP2P={}
preUid = ""
listPid = []
for line in sys.stdin:
(uid, pid, score) = line.strip().split('\t')
if uid != preUid:
if preUid != "":
process(listPid, dictP2P)
listPid=[]
preUid = uid
listPid.append(pid)
process(listPid, dictP2P)
dictW = {}
for key in dictP2P:
(keyA,keyB) = key.split(':')
obj = dictW.get(keyA, {})
obj[keyB] = dictP2P[key]/math.sqrt(dictP2P[keyA+':'+keyA]*dictP2P[keyB+':'+keyB])
dictW[keyA] = obj
for pid in dictW:
sl = sorted(dictW[pid].iteritems(), key=lambda d:d[1], reverse = True)
for pt in sl:
print "%s:%s:%.4lf" %(pid, pt[0], pt[1])
2、計算使用者推薦
有了使用者的評分記錄:u,p,score
以及物品的相似度記錄 p1:p2:r
我們可以計算使用者與物品的興趣度了
參看公式,我們這裡關鍵是要求出物品相似度與使用者評分的交集,
這裡map我們把兩項記錄中的p提取出來做key
這樣reduce過程中相同p的記錄就會在一起,
例如這些記錄:
p1,u1,3
p1,u2,4
p1,p2,0.4
p1,p3,0.6
所以通過p1這個記錄,我們可以計算 P(u1,p2),p(u1,p3),p(u2,p2),p(u2,p3)的部分值
(這裡只是部分,後面可能u1,p2還能通過p6組合在一起,至於沒有計算到的其他物品,計算了也會是0,就忽略)
所以這裡的輸出就是:
<u:p> sc,u:p組合為key,進入下一輪進行系統性求和
程式碼如下:
for line in sys.stdin:
line = line.strip("\n")
if line.find(':')>0:
(p1,p2,sc) = line.split(':')
print "%s\t%s:%s" %(p1,p2,sc)
else:
(u1,p1,sc) = line.split(',')
print "%s\t%s,%s" %(p1,u1,sc)
Tips這裡通過“:”,“,”來區分字符集,還有很多其他方法比如mapper的環境變數,或者新增欄位
怎麼方便怎麼來吧。
reduce:
def process(mapUid, mapPid, mapReco):
#print mapUid,mapPid
for uid in mapUid:
for pid in mapPid:
obj = mapReco.get(uid, {})
obj[pid] = mapUid[uid] * mapPid[pid] + obj.get(pid, 0)
mapReco[uid] = obj
def run():
dictP2P={}
prePid = ""
mapPid={}
mapUid={}
mapReco = {}
for line in sys.stdin:
(pid, val) = line.strip().split('\t')
if pid != prePid:
if prePid != "":
process(mapUid, mapPid, mapReco)
mapUid={}
mapPid={}
prePid = pid
if val.find(':')>0:
(p,v) = val.split(':')
mapPid[p] = float(v)
else:
(u,v) = val.split(',')
mapUid[u] = float(v)
process(mapUid, mapPid, mapReco)
for uid in mapReco:
sl = sorted(mapReco[uid].iteritems(), key=lambda d:d[1], reverse = True)
for pt in sl:
print "%s:%s:%.4lf" %(uid, pt[0], pt[1])
3、矯正使用者推薦
這裡就是上面過程的資料進行求和了,很簡單,因為多機並行會產生重複的key
mapper:
for line in sys.stdin:
try:
line = line.strip("\n")
(u,p,sc) = line.split(':')
print "%s:%s\t%s" %(u,p,sc)
except:
#sys.stderr.write(line+'\n')
#import traceback
traceback.print_exc()
pass
reducer:
def run():
task_conf=json.load(open('_job.conf'))
dict = {}
pre = ""
fsum = 0.0
for line in sys.stdin:
line = line.strip("\n")
(key,sc) = line.split('\t')
if key != pre:
if pre!="":
print "%s:%.4lf" %(pre, fsum)
fsum=float(sc)
pre = key
fsum = fsum + float(sc)
print "%s:%.4lf" %(pre, fsum)
這些就是最基礎的演算法了,計算還可以做很多優化,以及特判