1. 程式人生 > 實用技巧 >Apriori關聯分析

Apriori關聯分析

關聯分析

關聯關係是一種非常有用的資料探勘演算法,它可以分析出資料內在的關聯關係。其中比較著名的是啤酒和尿不溼的案例

交易號 清單
0 豆奶,萵苣
1 萵苣,尿布,啤酒,甜菜
2 豆奶,尿布,啤酒,橙汁
3 萵苣,豆奶,尿布,啤酒
4 萵苣,豆奶,尿布,橙汁

當超市在分析顧客的購物清單時發現一個比較奇怪的問題,為什麼大部顧客在購買啤酒的時候還會買啤酒呢?後來經過超市的調查發現,顧客的妻子提醒丈夫買尿不溼時丈父會把自己的啤酒也一起帶上。這是超市調查發現的尿不溼和啤酒的關係,如果資料量小我們還是可以處理的,但是涉及到大資料時,其複雜度就非常高,那我們有沒有其它方式去尋找這種關係呢?其實我們可以使用關聯演算法去挖掘我們商品之間的關聯關係,這些關係可以有兩種形式:頻繁項集或者關聯規則。頻繁項集(frequent item sets)是經常出現在一塊的物品的集合(即經常被一起購買的商品),關聯規則(association rules)暗示兩種物品之間可能存在很強的關係。

支援度和置信度

為了有效定義頻繁和關聯,我們引入兩個概念,支援度和置信度。

支援度(support),即事件發生的頻率,記作 ,例如一共有5條記錄,啤酒和尿布出現的次數是3次,這樣啤酒的支援度就是 3 / 5 = 0.6,支援度越大表格商品出現的次數就越多。

置信度(confidence),置信度揭事瞭如果事件A發生了,則事件B發生的的概率,記作 ,

例如啤酒和尿布被購買時,橙汁也一起被購買的概率記作 ,置信度的值表示事件A和B同時發生後的概率佔了A事件出現的概率的百份比,值越大表明買了啤酒和尿刷的顧客購買橙汁的概率比較大。

Apriori原理

由上面的介紹可以知道,我們需要計算所有組合的支援度和置信度,從而計算出所有可能被購買的頻繁項集這樣我們就可以對商品進行合理的佈局。

支援度

下面為了說明問題我們先假設我們有n種商品,那我們所有的商品排列組合為

其中k表示頻繁集合中元素的個數,也就是說我們要運算出所有的頻繁集合的話,時間複雜度為,如下圖(來自網路)例示我們有四種商品0, 1, 2, 3,那我們就有種頻繁集合,如果我們有1000種頻繁集合的話,可想而知這個資料量是非常的龐大的。

置信度

置信度的計算方式是我們遍歷所有的頻繁項集,然後找出每一個項集的所有的子集再使用上面的公式來計算出所有子集的置信度,這些子集的意思就是那些商品最可能被一起購買的商品組合,舉個例子如果我們有頻繁項集{0, 2, 3}那它可能出現的所有的子集是{0, 2}, {0, 3}, {2, 3}, {0, 2, 3}。使用排列組合的知識我們同理可以得出:

個排列組合,根據(1), (2)兩個公式我們可以得到計算所有子集的計算複雜度為 , 從公式可以看出我們如果要計算的時間複雜度非常高,我們需要把計算置信度方式進行降維。

下面我們先介紹Apriori的兩個定理:

定理1: 如果一個項集是頻繁的,那麼其所有的子集(subsets)也一定是頻繁的。 這個比較容易證明,因為某項集的子集的支援度一定不小於該項集。

定理2: 如果一個項集是非頻繁的,那麼其所有的超集(supersets)也一定是非頻繁的。 定理2是上一條定理的逆反定理。根據定理2,可以對項集樹進行如下剪枝(下圖來自網路):

因為資料量大的話,生成頻集的計算量也是非常大的,而Apriori給出的兩個生成頻繁集項的方法:

1.即第k-1項的所有頻繁集項和第一項的頻繁集項進行組合生成第k項候選頻繁集量,但是並不是所有的結果都是滿足需求的,我們要設定最小頻繁項集閥值,只有大於閥值才會轉正成為頻繁項集。

2.,選擇前項均相同的進行合併,生成頻繁集,這裡有個要求是必須是有序的,否則生成出來的項集並並不會符合要求。

演算法實現


# -*- coding:gbk -*-
'''
Created on 2018年2月12日

@author: Belle
'''
from sklearn.feature_extraction import DictVectorizer
from dask.array.chunk import arange
import time; # 引入time模組
from apriori2 import apriori

SUPPORT_DIVIDER = ","

CONFIDENCE_DIVIDER = "=>"

'''
構建模型
'''

class Apriori():
def init(self, dataSet, minSupport, minConfidence):
self.vec = DictVectorizer()
'''最小支援度'''
self.minSupport = minSupport
'''最小置信度'''
self.minConfidence = minConfidence
'''整個列表,陣列的行表示單個特證向量,裡面的特證不重複,而且每一行的長度有可能不一樣'''
self.dataSet = dataSet
self.numOfTypes = len(dataSet)
'''構建所有種類出現的次數'''
self.dataTypeMap = {}
'''初始化一項式'''
self.dataTypeMap[1] = createTrainSet(self.dataSet)

'''
構學無監督學習的資料
'''

def createTrainSet(dataTypeMap):
dataTypeMapResult = {}
for row in range(len(dataTypeMap)):
rowValues = dataTypeMap[row]
rowValues.sort()
for column in range(len(rowValues)):
value = str(rowValues[column])
if value in dataTypeMapResult:
'''更新當前鍵出現的次數'''
dataTypeMapResult[value] = dataTypeMapResult[value] + 1
else:
'''第一次出現的資料值為1'''
dataTypeMapResult[value] = 1
return dataTypeMapResult

'''
analize_x 為n*k列距陣
'''

def analize(dataSet, minSupport = 0.15, minConfidence = 0.7):
row = 2
apriori = Apriori(dataSet, minSupport, minConfidence)
'''從C(2, n), C(3, n)....到C(n, n)'''
while True:
if innerLoop(apriori, row) == 0:
break
row = row + 1
'''生成關規則'''
generateRule(apriori)
return apriori

'''
計算通過k-1項,計算k項的資料
'''

def innerLoop(apriori, kSet):
'''候選 k項式修選集'''
kSetItems = {}
beforeLenght = len(kSetItems)

<span class="hljs-string">'''選擇k項式的值'''</span>
print(<span class="hljs-string">"選擇{0}項式的值開始..."</span>.format(kSet))
startTime = time.time()
scanKMinusItems(kSetItems, apriori, kSet)
print(<span class="hljs-string">"獲取候選{0}項式時間:"</span>.format(kSet) + str(time.time() - startTime))

<span class="hljs-string">'''對候選集進行剪枝'''</span>
print(<span class="hljs-string">"剪枝開始,剪枝數量{0}..."</span>.format(len(kSetItems)))
startTime = time.time()
sliceBranch(kSetItems, apriori)
print(<span class="hljs-string">"剪枝花費時間:"</span> + str(time.time() - startTime))
<span class="hljs-string">'''存在下一個key_set,則放在結果中'''</span>
afterLength = len(kSetItems)
<span class="hljs-keyword">if</span> afterLength != beforeLenght:
    apriori.dataTypeMap[kSet] = kSetItems
    <span class="hljs-keyword">return</span> <span class="hljs-number">1</span>
<span class="hljs-keyword">else</span>:
    <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>

'''
通過1項式和k-1項式生成k項式
'''

def scanKMinusItems(kSetItems, apriori, kSet):
'''頻集1項式和k-1項式,組成新的k項式,然後把不滿足的項式去掉'''
'''頻集1項式'''
keys = list(apriori.dataTypeMap[1].keys())

<span class="hljs-string">'''k-1項式,,1項式和k-1項式組成k項式'''</span>
kMinusOneKeys = list(apriori.dataTypeMap[kSet - <span class="hljs-number">1</span>].keys())
<span class="hljs-string">'''生成候選集'''</span>
<span class="hljs-keyword">for</span> row <span class="hljs-keyword">in</span> range(len(keys)):
    <span class="hljs-keyword">for</span> column <span class="hljs-keyword">in</span> range(len(kMinusOneKeys)):
        <span class="hljs-string">'''2項式時,由於1項式和1項式進行組合,需要去除相同的組合數'''</span>
        <span class="hljs-keyword">if</span> kSet == <span class="hljs-number">2</span> <span class="hljs-keyword">and</span> row == column:
            <span class="hljs-keyword">continue</span>
        calc(keys[row], kMinusOneKeys[column], kSetItems)

'''
生成候選頻繁集
@param oneDataSetKey: 1項式的key值
@param dataSet: 訓練集1項式
@param kMinusOneItemKey: k - 1項式的key值
@param kDataSetItems: k項式map資料
'''

def calc(oneDataSetKey, kMinusOneItemKey, kDataSetItems):
if kMinusOneItemKey.find(oneDataSetKey) != -1:
return
kDataSetItemKeys = kMinusOneItemKey + SUPPORT_DIVIDER + str(oneDataSetKey)
'''分割成陣列,再進行排序'''
kItemKeys = kDataSetItemKeys.split(SUPPORT_DIVIDER)
kItemKeys.sort()
'''list合成欄位串'''
kDataSetItemKeys = SUPPORT_DIVIDER.join(kItemKeys)
'''kDataSetItemKeys初始值為0'''
if kDataSetItemKeys in kDataSetItems.keys():
kDataSetItems[kDataSetItemKeys] += 1
else:
kDataSetItems[kDataSetItemKeys] = 0

'''
對後選頻煩集進行剪枝
@param kDataSetItems
'''

def sliceBranch(kDataSetItems, apriori):
kItemKeyArrays = list(kDataSetItems.keys())
'''計算kItemKeys數組裡面的所有元素同時在dataSet中出現的次數'''
dataSets = {}
for kItemKeys in kItemKeyArrays:
kItemKeyArray = kItemKeys.split(SUPPORT_DIVIDER)
kDataSetItemCount = 0
setData = set(kItemKeyArray)
for rowIndex in range(len(apriori.dataSet)):
if rowIndex in dataSets:
rowArray = dataSets[rowIndex]
else:
rowArray = set(apriori.dataSet[rowIndex])
dataSets[rowIndex] = rowArray
'''長度大於資料長度'''
if len(rowArray) < len(kItemKeyArray):
continue
'''判斷所有元素是否都在列表中同時存在'''
if setData.issubset(set(rowArray)):
kDataSetItemCount += 1
'''小於最小支援度,則不新增到列表中'''
if apriori.minSupport > kDataSetItemCount / apriori.numOfTypes:
del kDataSetItems[kItemKeys]
else:
kDataSetItems[kItemKeys] = kDataSetItemCount

'''
計算置信度
@param kDataSetItems: 頻集資料集{1:{'1, 2, 3':次數}}
@param apriori: 關聯資料類
'''

def generateRule(apriori):
cacheKeySet = {}
resultConfidence = {}
'''key是頻集集合,value代表是K項式的k值'''
for key in apriori.dataTypeMap:
if key == 1:
continue
innerMap = apriori.dataTypeMap[key]
for innerKey in innerMap:
keyArray = innerKey.split(SUPPORT_DIVIDER)
generateRule2(apriori, keyArray, innerMap[innerKey], resultConfidence, len(keyArray) - 1)

'''
目標繁集項和源繁集項兩兩結合在一起
@param kDataSetItems: 二項式繁集項
@param targetItems: 某個目標繁集
@param sourceItems: 源繁集項
'''

def generateRule2(apriori, targetItems, supportTargetItems, resultConfidence, childIndex):
if childIndex <= 0:
return
dataMap = apriori.dataTypeMap
'''資料長度'''
dataLength = len(targetItems)
totalSets = set(targetItems)
'''構造targetItems非空真子集,並計算至信度'''
for index in range(dataLength):
'''超過了陣列長度'''
if index + childIndex > dataLength:
break
'''從index開始取childIndex個數據表示是leftDataSet'''
leftDataArray = targetItems[index:childIndex + index]
leftDataArray.sort()
'''取總列表與左邊的列表相減,就是右列key'''
rightDataArray = list(totalSets ^ set(leftDataArray))
rightDataArray.sort()

    leftDataKeyString = SUPPORT_DIVIDER.join(leftDataArray)
    rightDataKeyString = SUPPORT_DIVIDER.join(rightDataArray)
    
    <span class="hljs-string">'''不存在數量為陣列長度的頻集'''</span>
    <span class="hljs-keyword">if</span> (len(leftDataArray) <span class="hljs-keyword">not</span> <span class="hljs-keyword">in</span> dataMap) <span class="hljs-keyword">or</span> (len(rightDataArray) <span class="hljs-keyword">not</span> <span class="hljs-keyword">in</span> dataMap):
        <span class="hljs-keyword">continue</span>
    
    <span class="hljs-string">'''非頻集'''</span>
    <span class="hljs-keyword">if</span> (leftDataKeyString <span class="hljs-keyword">not</span> <span class="hljs-keyword">in</span> dataMap[len(leftDataArray)]) <span class="hljs-keyword">or</span> \
        (rightDataKeyString <span class="hljs-keyword">not</span> <span class="hljs-keyword">in</span> dataMap[len(rightDataArray)]):
        <span class="hljs-keyword">continue</span>
    
    <span class="hljs-string">'''leftDataKey出現的時,rightDataKeyString出現的概率,即頻集列表中兩個出現的數量'''</span>
    confidence = supportTargetItems / \
        dataMap[len(leftDataArray)][leftDataKeyString]
    <span class="hljs-keyword">if</span> confidence &gt; apriori.minConfidence:
        <span class="hljs-string">'''置信度大於閥值'''</span>
        print(<span class="hljs-string">"{0}===&gt;&gt;{1}: confidence = {2}"</span>.format(leftDataKeyString, rightDataKeyString, str(confidence)))
        resultConfidence[leftDataKeyString + CONFIDENCE_DIVIDER + rightDataKeyString] = confidence
    <span class="hljs-keyword">else</span>:
        <span class="hljs-string">'''置信度小於閥值,放在ignore例表中,用於下次判'''</span>
<span class="hljs-string">'''遞規的方式雲偏歷'''</span>
generateRule2(apriori, targetItems, supportTargetItems, resultConfidence, childIndex - <span class="hljs-number">1</span>)

複製程式碼

測試程式碼

# -*- coding:gbk -*-
'''
Created on 2018年2月12日

@author: Belle
'''
import Apriori

analize_x = [["豆奶", "萵苣"],\
             ["萵苣", "尿布", "啤酒", "甜菜"],\
             ["豆奶", "尿布", "啤酒", "橙汁"],\
             ["萵苣", "豆奶", "尿布", "啤酒"],\
             ["萵苣", "豆奶", "尿布", "橙汁"]]

apriori = Apriori.analize(analize_x)
print(apriori.dataTypeMap)

複製程式碼