1. 程式人生 > >Spark用Java實現的WordCount

Spark用Java實現的WordCount

java版本的sparkWordCount

/**
 * Java版本的spark WordCount
 */
public class JavaWordCount {
    public static void main(String[] args) {
        //1.先建立conf物件進行配置,主要是設定名稱,為了設定執行模式
        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
        //2.建立context物件
        JavaSparkContext jsc =
new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("dir/file.txt"); //3.進行切分資料 --flatMapFunction是具體實現類 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { //Iterable是所有集合的超級父介面 @Override public
Iterable<String> call(String s) throws Exception { List<String> splited = Arrays.asList(s.split(" ")); return splited; } }); //4.將資料生成元組 //第一個泛型是輸入的資料型別,後兩個引數是輸出引數元組的資料 final JavaPairRDD<String, Integer> tuples =
words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //5.聚合 JavaPairRDD<String, Integer> sumed = tuples.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * * @param v1 相同key對應的value * @param v2 相同key對應的value * @return * @throws Exception */ @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); //因為Java API 沒有提供sortedBy 運算元,此時需要將元組中的資料進行位置調換,排完序再換回來 //第一次交換是為了排序 JavaPairRDD<Integer, String> swaped = sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception { return tup.swap(); } }); //排序 JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false); //第二次交換是為了最終結果 <單詞,數量> JavaPairRDD<String, Integer> res = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception { return tup.swap(); } }); System.out.println(res.collect()); res.saveAsTextFile("out4"); jsc.stop(); } }