Spark Mllib測試
一、FPGrowth演算法理解
Spark.mllib 提供並行FP-growth演算法,這個演算法屬於關聯規則演算法【關聯規則:兩不相交的非空集合A、B,如果A=>B,就說A=>B是一條關聯規則,常提及的{啤酒}-->{尿布}就是一條關聯規則】,經常用於挖掘頻度物品集。關於演算法的介紹網上很多,這裡不再贅述。主要搞清楚幾個概念:
1)支援度support(A => B) = P(AnB) = |A n B| / |N|,表示資料集D中,事件A和事件B共同出現的概率;
2)置信度confidence(A => B) = P(B|A) = |A n B| / |A|,表示資料集D中,出現事件A的事件中出現事件B的概率;
3)提升度lift(A => B) = P(B|A):P(B) = |A n B| / |A| : |B| / |N|,表示資料集D中,出現A的條件下出現事件B的概率和沒有條件A出現B的概率;
由上可以看出,支援度表示這條規則的可能性大小,而置信度表示由事件A得到事件B的可信性大小。
舉個列子:10000個消費者購買了商品,尿布1000個,啤酒2000個,同時購買了尿布和啤酒800個。
1)支援度:在所有項集中出現的可能性,項集同時含有,x與y的概率。尿布和啤酒的支援度為:800/10000=8%
2)置信度:在X發生的條件下,Y發生的概率。尿布-》啤酒的置信度為:800/1000=80%,啤酒-》尿布的置信度為:800/2000=40%
3)提升度:在含有x條件下同時含有Y的可能性(x->y的置信度)比沒有x這個條件下含有Y的可能性之比:confidence(尿布=> 啤酒)/概率(啤酒)) = 80%/(2000/10000) 。如果提升度=1,那就是沒啥關係這兩個
通過支援度和置信度可以得出強關聯關係,通過提升的,可判別有效的強關聯關係。
直接拿例子來說明問題。首先資料集如下:
r z h k p
z y x w v u t s
s x o n r
x z y m t s q e
z
x z y r q t p
二、程式碼實現。在IDEA中建立Maven工程,然後本地模式除錯程式碼如下:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.mllib.fpm.AssociationRules;
-
import org.apache.spark.mllib.fpm.FPGrowth;
-
import org.apache.spark.mllib.fpm.FPGrowthModel;
-
import java.util.Arrays;
-
import java.util.List;
-
public class FPDemo {
-
public static void main(String[] args){
-
String data_path; //資料集路徑
-
double minSupport = 0.2;//最小支援度
-
int numPartition = 10; //資料分割槽
-
double minConfidence = 0.8;//最小置信度
-
if(args.length < 1){
-
System.out.println("<input data_path>");
-
System.exit(-1);
-
}
-
data_path = args[0];
-
if(args.length >= 2)
-
minSupport = Double.parseDouble(args[1]);
-
if(args.length >= 3)
-
numPartition = Integer.parseInt(args[2]);
-
if(args.length >= 4)
-
minConfidence = Double.parseDouble(args[3]);
-
SparkConf conf = new SparkConf().setAppName("FPDemo").setMaster("local");
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
//載入資料,並將資料通過空格分割
-
JavaRDD<List<String>> transactions = sc.textFile(data_path)
-
.map(new Function<String, List<String>>() {
-
public List<String> call(String s) throws Exception {
-
String[] parts = s.split(" ");
-
return Arrays.asList(parts);
-
}
-
});
-
//建立FPGrowth的演算法例項,同時設定好訓練時的最小支援度和資料分割槽
-
FPGrowth fpGrowth = new FPGrowth().setMinSupport(minSupport).setNumPartitions(numPartition);
-
FPGrowthModel<String> model = fpGrowth.run(transactions);//執行演算法
-
//檢視所有頻繁諅,並列出它出現的次數
-
for(FPGrowth.FreqItemset<String> itemset : model.freqItemsets().toJavaRDD().collect())
-
System.out.println("[" + itemset.javaItems() + "]," + itemset.freq());
-
//通過置信度篩選出強規則
-
//antecedent表示前項
-
//consequent表示後項
-
//confidence表示規則的置信度
-
for(AssociationRules.Rule<String> rule : model.generateAssociationRules(minConfidence).toJavaRDD().collect())
-
System.out.println(rule.javaAntecedent() + "=>" + rule.javaConsequent() + ", " + rule.confidence());
-
}
-
}
直接在Maven工程中運用上面的程式碼會有問題,因此這裡需要新增依賴項解決專案中的問題,依賴項的新增如下:
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.spark</groupId>
-
<artifactId>spark-core_2.10</artifactId>
-
<version>2.1.0</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.spark</groupId>
-
<artifactId>spark-mllib_2.10</artifactId>
-
<version>2.1.0</version>
-
</dependency>
-
</dependencies>
本地模式執行的結果如下:
-
[t, s, y]=>[x], 1.0
-
[t, s, y]=>[z], 1.0
-
[y, x, z]=>[t], 1.0
-
[y]=>[x], 1.0
-
[y]=>[z], 1.0
-
[y]=>[t], 1.0
-
[p]=>[r], 1.0
-
[p]=>[z], 1.0
-
[q, t, z]=>[y], 1.0
-
[q, t, z]=>[x], 1.0
-
[q, y]=>[x], 1.0
-
[q, y]=>[z], 1.0
-
[q, y]=>[t], 1.0
-
[t, s, x]=>[y], 1.0
-
[t, s, x]=>[z], 1.0
-
[q, t, y, z]=>[x], 1.0
-
[q, t, x, z]=>[y], 1.0
-
[q, x]=>[y], 1.0
-
[q, x]=>[t], 1.0
-
[q, x]=>[z], 1.0
-
[t, x, z]=>[y], 1.0
-
[x, z]=>[y], 1.0
-
[x, z]=>[t], 1.0
-
[p, z]=>[r], 1.0
-
[t]=>[y], 1.0
-
[t]=>[x], 1.0
-
[t]=>[z], 1.0
-
[y, z]=>[x], 1.0
-
[y, z]=>[t], 1.0
-
[p, r]=>[z], 1.0
-
[t, s]=>[y], 1.0
-
[t, s]=>[x], 1.0
-
[t, s]=>[z], 1.0
-
[q, z]=>[y], 1.0
-
[q, z]=>[t], 1.0
-
[q, z]=>[x], 1.0
-
[q, y, z]=>[x], 1.0
-
[q, y, z]=>[t], 1.0
-
[y, x]=>[z], 1.0
-
[y, x]=>[t], 1.0
-
[q, x, z]=>[y], 1.0
-
[q, x, z]=>[t], 1.0
-
[t, y, z]=>[x], 1.0
-
[q, y, x]=>[z], 1.0
-
[q, y, x]=>[t], 1.0
-
[q, t, y, x]=>[z], 1.0
-
[t, s, x, z]=>[y], 1.0
-
[s, y, x]=>[z], 1.0
-
[s, y, x]=>[t], 1.0
-
[s, x, z]=>[y], 1.0
-
[s, x, z]=>[t], 1.0
-
[q, y, x, z]=>[t], 1.0
-
[s, y]=>[x], 1.0
-
[s, y]=>[z], 1.0
-
[s, y]=>[t], 1.0
-
[q, t, y]=>[x], 1.0
-
[q, t, y]=>[z], 1.0
-
[t, y]=>[x], 1.0
-
[t, y]=>[z], 1.0
-
[t, z]=>[y], 1.0
-
[t, z]=>[x], 1.0
-
[t, s, y, x]=>[z], 1.0
-
[t, y, x]=>[z], 1.0
-
[q, t]=>[y], 1.0
-
[q, t]=>[x], 1.0
-
[q, t]=>[z], 1.0
-
[q]=>[y], 1.0
-
[q]=>[t], 1.0
-
[q]=>[x], 1.0
-
[q]=>[z], 1.0
-
[t, s, z]=>[y], 1.0
-
[t, s, z]=>[x], 1.0
-
[t, x]=>[y], 1.0
-
[t, x]=>[z], 1.0
-
[s, z]=>[y], 1.0
-
[s, z]=>[x], 1.0
-
[s, z]=>[t], 1.0
-
[s, y, x, z]=>[t], 1.0
-
[s]=>[x], 1.0
-
[t, s, y, z]=>[x], 1.0
-
[s, y, z]=>[x], 1.0
-
[s, y, z]=>[t], 1.0
-
[q, t, x]=>[y], 1.0
-
[q, t, x]=>[z], 1.0
-
[r, z]=>[p], 1.0
三、Spark叢集部署。程式碼修改正如:
-
import org.apache.spark.SparkConf;
-
import org.apache.spark.api.java.JavaRDD;
-
import org.apache.spark.api.java.JavaSparkContext;
-
import org.apache.spark.api.java.function.Function;
-
import org.apache.spark.mllib.fpm.AssociationRules;
-
import org.apache.spark.mllib.fpm.FPGrowth;
-
import org.apache.spark.mllib.fpm.FPGrowthModel;
-
import java.util.Arrays;
-
import java.util.List;
-
public class FPDemo {
-
public static void main(String[] args){
-
String data_path; //資料集路徑
-
double minSupport = 0.2;//最小支援度
-
int numPartition = 10; //資料分割槽
-
double minConfidence = 0.8;//最小置信度
-
if(args.length < 1){
-
System.out.println("<input data_path>");
-
System.exit(-1);
-
}
-
data_path = args[0];
-
if(args.length >= 2)
-
minSupport = Double.parseDouble(args[1]);
-
if(args.length >= 3)
-
numPartition = Integer.parseInt(args[2]);
-
if(args.length >= 4)
-
minConfidence = Double.parseDouble(args[3]);
-
SparkConf conf = new SparkConf().setAppName("FPDemo");////修改的地方
-
JavaSparkContext sc = new JavaSparkContext(conf);
-
//載入資料,並將資料通過空格分割
-
JavaRDD<List<String>> transactions = sc.textFile(data_path)
-
.map(new Function<String, List<String>>() {
-
public List<String> call(String s) throws Exception {
-
String[] parts = s.split(" ");
-
return Arrays.asList(parts);
-
}
-
});
-
//建立FPGrowth的演算法例項,同時設定好訓練時的最小支援度和資料分割槽
-
FPGrowth fpGrowth = new FPGrowth().setMinSupport(minSupport).setNumPartitions(numPartition);
-
FPGrowthModel<String> model = fpGrowth.run(transactions);//執行演算法
-
//檢視所有頻繁諅,並列出它出現的次數
-
for(FPGrowth.FreqItemset<String> itemset : model.freqItemsets().toJavaRDD().collect())
-
System.out.println("[" + itemset.javaItems() + "]," + itemset.freq());
-
//通過置信度篩選出強規則
-
//antecedent表示前項
-
//consequent表示後項
-
//confidence表示規則的置信度
-
for(AssociationRules.Rule<String> rule : model.generateAssociationRules(minConfidence).toJavaRDD().collect())
-
System.out.println(rule.javaAntecedent() + "=>" + rule.javaConsequent() + ", " + rule.confidence());
-
}
-
}
然後在IDEA中打包成JAR包
然後在工具欄
生成Jar包,然後上傳到叢集中執行命令
得到結果