1. 程式人生 > >[大資料]ItemBased協同過濾hadoop實現

[大資料]ItemBased協同過濾hadoop實現

協同過濾演算法這裡就不再敘述了

隨便說一點,通常我們使用的基於物品和使用者的協同過濾演算法於AndrewNG的機器學習描述的有些不一樣

視訊中虛擬了一個X出來,然後通過梯度下降法不斷的計算X和theta使costFunction(與y關聯)最小。

實際使用中,X這個表示物品的特徵引數已經去掉了,直接使用Y(使用者對物品的評分)來計算物品(使用者)相似度以及做推薦

這裡我們定義 物品uv的相似度為 ,N(u)對物品u有過評分的使用者列表。

定義推薦為使用者i對物品u的興趣度, S(u,K)為與u相似的前K個物品,N(i)為使用者i有過評分的物品

rvi為使用者i對v的評分.

演算法實現不難,但是一旦牽扯到大資料,就出了規模性問題

首先,物品或者使用者某一項肯定會很大,演算法中使用者*物品的矩陣肯定不能直接儲存

eg:100w使用者1w商品則開出來的矩陣有10,000,000,000元素每個元素假定4位元組也有超過32G的記憶體,

因此這裡不能直接把某些項放入記憶體做cache來優化計算,不然隨著問題規模的擴大遲早爆記憶體。

這裡採用空間換時間,利用mapreduce的排序特性做檔案儲存計算了,充分利用key會排序這個特性進行多步mapreduce計算

輸入格式:uid,pid,score的一系列日誌資料。

mapreduce過程分為3步:

1、計算物品之間的相關度

map:in uid,pid,score

map:out <uid> pid score

map把所有的輸入處理為以uid為key的輸出,這樣做的目的是通過shuffle後所有的uid相同的會排在一起,方便reduce計算物品共同出現的次數

核心程式碼:

    dict = {}
    for line in sys.stdin:
        (uid, pid, score) = line.strip('\n').split(',')
        print "%s\t%s\t%d" %(uid,pid,score)

reduce:in <uid> pid score

reduce:out pida:pidb:val

reduce讀入map中所有同一個使用者的評分商品後,對這些商品兩兩出現次數+1

最後計算物品相似度。

這裡我們假設物品的數量有限,可以在記憶體中儲存物品到物品共線矩陣,當然

我們只儲存非0的,稀疏矩陣。

def process(listPid,res):
    for keyA in listPid:
        for keyB in listPid:
            res[keyA+':'+keyB] = res.get(keyA+':'+keyB, 0) + 1#這裡keyA=keyB時也計算在內是有意義的,因為res[keyA:keyA]每個使用者計算會+1,所以最後的結果是keyA出現的總次數。既N(i)的值。


def run():
    dictP2P={}
    preUid = ""
    listPid = []


    for line in sys.stdin:
        (uid, pid, score) = line.strip().split('\t')
        if uid != preUid:
            if preUid != "":
                process(listPid, dictP2P)
                listPid=[]
            preUid = uid

        listPid.append(pid)

    process(listPid, dictP2P)

    dictW = {}

    for key in dictP2P:
        (keyA,keyB) = key.split(':')
        obj = dictW.get(keyA, {})
        obj[keyB] = dictP2P[key]/math.sqrt(dictP2P[keyA+':'+keyA]*dictP2P[keyB+':'+keyB])
        dictW[keyA] = obj

    for pid in dictW:
        sl = sorted(dictW[pid].iteritems(), key=lambda d:d[1], reverse = True)
        for pt in sl:
            print "%s:%s:%.4lf" %(pid, pt[0], pt[1])

2、計算使用者推薦

有了使用者的評分記錄:u,p,score

以及物品的相似度記錄 p1:p2:r

我們可以計算使用者與物品的興趣度了

參看公式,我們這裡關鍵是要求出物品相似度與使用者評分的交集,

這裡map我們把兩項記錄中的p提取出來做key

這樣reduce過程中相同p的記錄就會在一起,

例如這些記錄:

p1,u1,3

p1,u2,4

p1,p2,0.4

p1,p3,0.6

所以通過p1這個記錄,我們可以計算 P(u1,p2),p(u1,p3),p(u2,p2),p(u2,p3)的部分值

(這裡只是部分,後面可能u1,p2還能通過p6組合在一起,至於沒有計算到的其他物品,計算了也會是0,就忽略)

所以這裡的輸出就是:

<u:p> sc,u:p組合為key,進入下一輪進行系統性求和

程式碼如下:

for line in sys.stdin:
            line = line.strip("\n")
            if line.find(':')>0:
                (p1,p2,sc) = line.split(':')
                print "%s\t%s:%s" %(p1,p2,sc)
            else:
                (u1,p1,sc) = line.split(',')
                print "%s\t%s,%s" %(p1,u1,sc)
Tips這裡通過“:”,“,”來區分字符集,還有很多其他方法比如mapper的環境變數,或者新增欄位

怎麼方便怎麼來吧。

reduce:

def process(mapUid, mapPid, mapReco):
    #print mapUid,mapPid
    for uid in mapUid:
        for pid in mapPid:
            obj = mapReco.get(uid, {})
            obj[pid] = mapUid[uid] * mapPid[pid] + obj.get(pid, 0)
            mapReco[uid] = obj
def run():
    dictP2P={}
    prePid = ""
    mapPid={}
    mapUid={}
    mapReco = {}

    for line in sys.stdin:
        (pid, val) = line.strip().split('\t')
        if pid != prePid:
            if prePid != "":
                process(mapUid, mapPid, mapReco)
                mapUid={}
                mapPid={}
            prePid = pid

        if val.find(':')>0:
            (p,v) = val.split(':')
            mapPid[p] = float(v)
        else:
            (u,v) = val.split(',')
            mapUid[u] = float(v)

    process(mapUid, mapPid, mapReco)

    for uid in mapReco:
        sl = sorted(mapReco[uid].iteritems(), key=lambda d:d[1], reverse = True)
        for pt in sl:
            print "%s:%s:%.4lf" %(uid, pt[0], pt[1])

3、矯正使用者推薦

這裡就是上面過程的資料進行求和了,很簡單,因為多機並行會產生重複的key

mapper:

    for line in sys.stdin:
        try:
            line = line.strip("\n")
            (u,p,sc) = line.split(':')
            print "%s:%s\t%s" %(u,p,sc)


        except:
            #sys.stderr.write(line+'\n')
            #import traceback
            traceback.print_exc()
            pass

reducer:

def run():
    task_conf=json.load(open('_job.conf'))
    dict = {}
    pre = ""
    fsum = 0.0
    for line in sys.stdin:
        line = line.strip("\n")
        (key,sc) = line.split('\t')
        if key != pre:
            if pre!="":
                print "%s:%.4lf" %(pre, fsum)
            fsum=float(sc)
            pre = key
        fsum = fsum + float(sc)
    print "%s:%.4lf" %(pre, fsum)

這些就是最基礎的演算法了,計算還可以做很多優化,以及特判