1. 程式人生 > >spark機器學習實現之fpgrowth

spark機器學習實現之fpgrowth

很久之前就像寫一些關於資料探勘演算法的東西,因為懶現在才開始動手,因為fpgrowth演算法在mlib中的實現
相對比較簡單,所以打算先拿它下手。
關於fpgrowth的原理本人說的也不專業, 推薦

這裡主要寫一下在mlib當中,實現的一個過程
先上程式碼

Logger.getLogger("org").setLevel(Level.ERROR)
    conf.setAppName("fpgrowth")
    //設定引數
    //最小支援度
    val minSupport = 0.1
    //最小置信度
    val minConfidence = 0.8
    //資料分割槽
val numPartitions = 2

之前定義好了一個類,裡邊對conf的基礎設定做了定義,直接繼承,然後配置appname
1.設定好支援度和置信度閥值
2.設定好資料的分割槽

 val data = sc.textFile("/usr/local/soft/data/fpgrowth.csv")
    //把資料通過空格分割
    val transactions = data.map(x => x.split(","))
    transactions.cache()

取數,我為了快速實現,還是把資料放到了資料檔案裡邊了,在資料量比較大的情況下,還是
推薦hive或者hdfs上去存放資料。

3567,3577,3645,3653
17805
3665,3669
13341,16617,16661,17291,17535,17549,17553,17921,17931,18205,18225,3467,3567,3699,3771,3785,3943,3959,3981,4019,8623,8635,8651,8657,8663
12369,12371,12585,12587,12589,12593,12597,12605,2188,4335,6597

以上是部分資料展現,記錄的是使用者的瀏覽情況

//建立一個FPGrowth的演算法實列
    val fpg = new FPGrowth()
    //設定訓練時候的最小支援度和資料分割槽
fpg.setMinSupport(minSupport) fpg.setNumPartitions(numPartitions)

建立例項並且設定分割槽和支援度閥值

 //把資料帶入演算法中
    val model = fpg.run(transactions)

跑演算法,這裡其實才是整段的核心部分,返回型別是FPGrowthModel[String],關於這個型別,
如果有時間我們再來仔細的說一說

//檢視所有的頻繁項集,並且列出它出現的次數
    model.freqItemsets.collect().foreach(itemset => {
      println(itemset.items.mkString("[", ",", "]") + "," + itemset.freq)
    })

如果不放心可以看看自己的頻繁項集都是什麼樣子的

val Arrayrec = new ArrayBuffer[String];
    model.generateAssociationRules(minConfidence).collect().foreach(rule => {
      if (rule.antecedent.length.==(1)) {
        println(rule.antecedent.mkString + "-->" +
          rule.consequent.mkString + "-->" + rule.confidence)
        val rec = rule.antecedent.mkString + "," + rule.consequent.mkString+","+rule.confidence

        Arrayrec += rec
      }
    })

可以打印出來看看,其中的關聯規則都是怎麼樣子的。antecedent表示前項,consequent表示後項,
,confidence表示規則的置信度。

12539-->13345-->1.0
12539-->12551-->1.0
12539-->13705-->1.0
12539-->13695-->1.0
12539-->12533-->1.0
12539-->13697-->1.0
12539-->12535-->1.0

大概的規則就是這樣了,因為我自己的需要,我選擇了前項是1的。
資料還是要儲存的,寫個函式存到mysql裡邊,就可以之前前臺查詢了

 def writeRecResultToMysql(uid: ArrayBuffer[String], sqlContext: SQLContext, sc: SparkContext) {
      //val uidString = uid.map(x => x.split(",")(0).toString() + "," + x.split(",")(1).toString())
      import sqlContext.implicits._
      val uidDFArray = sc.parallelize(uid)
      val uidDF = uidDFArray.map(x => x.split(",")).map(x => FpgrowthResult(x(0).trim().toInt, x(1).trim.toInt,x(2).trim().toDouble)).toDF
      uidDF.write.mode(SaveMode.Append).jdbc(jdbcURL, recResultTable_fpGrowth, prop)

    }

要是還想看看規則生成了多少條,也可以列印一下

    println(model.generateAssociationRules(minConfidence).collect().length)

好了,至此簡單的邏輯應該沒什麼問題了。
ps:簡單邏輯實現過程中要是存在什麼問題,還請各位大牛指正