Spark用Java實現的WordCount
阿新 • • 發佈:2019-01-03
java版本的sparkWordCount
/**
* Java版本的spark WordCount
*/
public class JavaWordCount {
public static void main(String[] args) {
//1.先建立conf物件進行配置,主要是設定名稱,為了設定執行模式
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//2.建立context物件
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("dir/file.txt");
//3.進行切分資料 --flatMapFunction是具體實現類
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
//Iterable是所有集合的超級父介面
@Override
public Iterable<String> call(String s) throws Exception {
List<String> splited = Arrays.asList(s.split(" "));
return splited;
}
});
//4.將資料生成元組
//第一個泛型是輸入的資料型別,後兩個引數是輸出引數元組的資料
final JavaPairRDD<String, Integer> tuples = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
//5.聚合
JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer, Integer>() {
/**
*
* @param v1 相同key對應的value
* @param v2 相同key對應的value
* @return
* @throws Exception
*/
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
//因為Java API 沒有提供sortedBy 運算元,此時需要將元組中的資料進行位置調換,排完序再換回來
//第一次交換是為了排序
JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
return tup.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//第二次交換是為了最終結果 <單詞,數量>
JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {
return tup.swap();
}
});
System.out.println(res.collect());
res.saveAsTextFile("out4");
jsc.stop();
}
}