1. 程式人生 > >MLlib的關聯演算法FPgrowth(與APriori比較)

MLlib的關聯演算法FPgrowth(與APriori比較)

1,

頻繁項集挖掘是一個關聯式規則挖掘問題。關聯挖掘是資料探勘中研究最早也是最活躍的領域,其中頻繁模式的挖掘是關聯挖掘的核心和基礎,是產生關聯規則挖掘的基礎。頻繁項集最經典的應用就是超市的購物籃分析。

2,

頻繁項:在多個集合中,頻繁出現的元素項。

頻繁項集:在一系列集合中每項都含有某些相同的元素,這些元素形成一個子集,滿足一定閥值就是頻繁項集。

K項集:K個頻繁項組成的一個集合。

支援度:包含頻繁項集(F)的集合的數目。

可信度:頻繁項與某項的並集的支援度與頻繁項集支援度的比值。

簡 單來說。頻繁項集的挖掘就是將資料集(一般是多行資料,每行資料的第一個元素的交易編號,後面的是物品編號)中出現頻率超過支援度的頻繁項找出來,而首先 找出的單個頻繁項的集合就是1-頻繁項集。而2-頻繁項就是某兩個頻繁項都同時出現在一行中並且出現頻率超過支援度的,那麼2-頻繁項集就是這些2-頻繁 項的集合,依次類推,K-頻繁項集就是K-頻繁項的集合。目前針對頻繁項集的演算法,主要有Apriori,FP-Growth和Eclat演算法。

3,

FP樹的結構和建數過程:

假設最小支援度是40%,那麼b e j c的支援度就是50%,a是67%。過濾掉非頻繁項集後即如上表中最後一列。並使這些元素按照出現的次序排序。

這一步其實就是預處理,減少需要計算的頻繁項集的候選集,排序的目的是頻繁項集關注的是組合而不是排列,在後面生成樹的時候需要避免生成重複不必要的分支。

然後遍歷過濾後的候選集和出現的次序,構建FP-tree如下:

發現頻繁項集的過程和Apriori一樣,也是逐步遞增的發現,即先找到1頻繁項集,然後再在1頻繁項集的基礎上找2頻繁項集。基於上面的FP-tree樹,也就是支援度大於百分之40的所有元素,在找2頻繁項集時,需要先抽取條件模式基(以每個頻繁項為結尾的,在FP樹中所有的字首路徑)

然後對上圖的頻繁項集元素,用它的條件模式基建立FP樹,再找2頻繁項集。

建樹的過程和上面是一樣的,然後再得到2頻繁項集。依次類推,挖掘K頻繁項集只需要在K-1頻繁項集上挖掘,重複上面的過程即可。

 從 這裡就可以發現一旦建立了FP樹之後就可以不斷遞迴挖掘K頻繁項集,對於Hadoop就會產生多次IO操作,嚴重影響程式執行效率,而Spark這種彈性 式記憶體計算框架可以試中間輸出和結果儲存在記憶體中,不需要重複讀寫HDFS,所以Spark能更好地適用於資料探勘需要遞迴的Map-Reduce演算法。

4,

在spark中:

分析大規模資料集的第一個步驟通常是挖掘頻繁專案,專案集,亞序列或其他子結構,這在資料探勘中作為一個活躍的研究主題已經很多年了。

MLlib提供了頻繁模糊挖掘的並行實現---FP-growth演算法。

FP-growth

給定一個交易資料集,FP-growth的第一步驟是計算專案的頻率,並確定頻繁專案。

FP-growth雖然與Apriori類演算法有相同的設計,但是FP-growth的第二步使用字尾樹(FP樹)結構對交易資料編碼且不會顯示生成候選集(生成的候選集通常開銷非常大)。

第二步之後,就可以從FP-growth的工作分發到其他機器,比單機執行有更好的效果

FP-Growth有以下引數:

minSupport:專案集被確定為頻繁的最小數量

numPartition:分發任務的數量

程式碼如下:

  1. package test
  1. import org.apache.spark.mllib.fpm.FPGrowth
  1. import org.apache.spark.{SparkConf, SparkContext}
  1. /**
  1.   * Created by zengxiaosen on 16/6/13.
  1.   */
  1. object FP_growth_test {
  1.   def main(args: Array[String]) {
  1.     val conf=new SparkConf()
  1.     conf.setMaster("local")
  1.     conf.setAppName("FP_growth")
  1.     val sc=new SparkContext(conf)
  1.     //資料格式是:物品1,物品2,物品3
  1. val data_path="/Users/zengxiaosen/test/item.txt"
  1.     val data=sc.textFile(data_path)
  1.     val examples=data.map(_.split(" ")).cache()
  1.     //建立模型
  1. val minSupport=0.7
  1.     val numPartition=10
  1.     val model=new FPGrowth()
  1.       .setMinSupport(minSupport)
  1.       .setNumPartitions(numPartition)
  1.       .run(examples)
  1.     //列印結果
  1. model.freqItemsets.collect().foreach { itemset =>
  1.       println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
  1.     }
  1.   }
  1. }

結果如下:

16/06/13 15:44:08 INFO TaskSetManager: Finished task 9.0 in stage 4.0 (TID 22) in 3 ms on localhost (10/10)

16/06/13 15:44:08 INFO DAGScheduler: ResultStage 4 (collect at FP_growth_test.scala:32) finished in 0.047 s

16/06/13 15:44:08 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 

16/06/13 15:44:08 INFO DAGScheduler: Job 2 finished: collect at FP_growth_test.scala:32, took 0.089328 s

[x], 3

[z], 3

[s], 3

[s,x], 3

16/06/13 15:44:08 INFO SparkContext: Invoking stop() from shutdown hook

16/06/13 15:44:08 INFO SparkUI: Stopped Spark web UI at http://192.168.201.114:4040

16/06/13 15:44:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/06/13 15:44:08 INFO MemoryStore: MemoryStore cleared

16/06/13 15:44:08 INFO BlockManager: BlockManager stopped

16/06/13 15:44:08 INFO BlockManagerMaster: BlockManagerMaster stopped

16/06/13 15:44:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/06/13 15:44:08 INFO SparkContext: Successfully stopped SparkContext

16/06/13 15:44:08 INFO ShutdownHookManager: Shutdown hook called

16/06/13 15:44:08 INFO ShutdownHookManager: Deleting directory /private/var/folders/8f/z66_0clj43s9sq2207bd78jw0000gn/T/spark-c824ae29-d0c5-4f56-97bc-58e1f915f0cf

Process finished with exit code 0