1. 程式人生 > >spark jdk8 單詞統計示例

spark jdk8 單詞統計示例

apache imp ace lang rtb use basis 寫法 work

在github上有spark-java8 實例地址:

https://github.com/ypriverol/spark-java8

https://github.com/ihr/java8-spark

學些java8 Lambda Expressions 的可以參考下,同時自己也做下比較。

java8 代碼實例

 1 /* 
 2  * Licensed to the Apache Software Foundation (ASF) under one or more 
 3  * contributor license agreements.  See the NOTICE file distributed with 
4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 *
http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and
15 * limitations under the License. 16 */ 17 18 package com.east.spark.stream; 19 20 import java.util.Arrays; 21 import java.util.List; 22 import java.util.regex.Pattern; 23 24 import org.apache.spark.api.java.JavaPairRDD; 25 import org.apache.spark.api.java.JavaRDD; 26 import org.apache.spark.sql.SparkSession; 27 28 import scala.Tuple2; 29 30 public final class JavaWordCount2 { 31 private static final Pattern SPACE = Pattern.compile(" "); 32 33 public static void main(String[] args) throws Exception { 34 35 args = new String[] { "D:/tmp/spark/test.txt" }; 36 37 if (args.length < 1) { 38 System.err.println("Usage: JavaWordCount <file>"); 39 System.exit(1); 40 } 41 42 SparkSession spark = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); 43 44 // SparkConf conf = new 45 // SparkConf().setAppName("ingini-spark-java8").setMaster("local"); 46 47 JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); 48 49 JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); 50 51 JavaPairRDD<String, Integer> counts = words.mapToPair(w -> new Tuple2<String, Integer>(w, 1)) 52 .reduceByKey((x, y) -> x + y); 53 // counts.collect(); 54 55 List<Tuple2<String, Integer>> output = counts.collect(); 56 for (Tuple2<?, ?> tuple : output) { 57 System.out.println(tuple._1() + ":== " + tuple._2()); 58 } 59 60 spark.stop(); 61 } 62 }

更簡潔的寫法:

1 JavaRDD<String> lines = sc.textFile("src/main/resources/a.txt");
2         JavaPairRDD<String, Integer> counts = lines.flatMap(line -> Arrays.asList(line.split(REGEX)))
3                 .mapToPair(word -> new Tuple2(word, 1))
4                 .reduceByKey((x, y) -> (Integer) x + (Integer) y)
5                 .sortByKey();
6         
7         counts.foreach(stringIntegerTuple2 ->System.out.println( stringIntegerTuple2._1+":"+stringIntegerTuple2._2));

spark jdk8 單詞統計示例