spark jdk8 單詞統計示例
阿新 • • 發佈:2017-12-25
apache imp ace lang rtb use basis 寫法 work
在github上有spark-java8 實例地址:
https://github.com/ypriverol/spark-java8
https://github.com/ihr/java8-spark
學些java8 Lambda Expressions 的可以參考下,同時自己也做下比較。
java8 代碼實例
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 *http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and15 * limitations under the License. 16 */ 17 18 package com.east.spark.stream; 19 20 import java.util.Arrays; 21 import java.util.List; 22 import java.util.regex.Pattern; 23 24 import org.apache.spark.api.java.JavaPairRDD; 25 import org.apache.spark.api.java.JavaRDD; 26 import org.apache.spark.sql.SparkSession; 27 28 import scala.Tuple2; 29 30 public final class JavaWordCount2 { 31 private static final Pattern SPACE = Pattern.compile(" "); 32 33 public static void main(String[] args) throws Exception { 34 35 args = new String[] { "D:/tmp/spark/test.txt" }; 36 37 if (args.length < 1) { 38 System.err.println("Usage: JavaWordCount <file>"); 39 System.exit(1); 40 } 41 42 SparkSession spark = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate(); 43 44 // SparkConf conf = new 45 // SparkConf().setAppName("ingini-spark-java8").setMaster("local"); 46 47 JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); 48 49 JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); 50 51 JavaPairRDD<String, Integer> counts = words.mapToPair(w -> new Tuple2<String, Integer>(w, 1)) 52 .reduceByKey((x, y) -> x + y); 53 // counts.collect(); 54 55 List<Tuple2<String, Integer>> output = counts.collect(); 56 for (Tuple2<?, ?> tuple : output) { 57 System.out.println(tuple._1() + ":== " + tuple._2()); 58 } 59 60 spark.stop(); 61 } 62 }
更簡潔的寫法:
1 JavaRDD<String> lines = sc.textFile("src/main/resources/a.txt"); 2 JavaPairRDD<String, Integer> counts = lines.flatMap(line -> Arrays.asList(line.split(REGEX))) 3 .mapToPair(word -> new Tuple2(word, 1)) 4 .reduceByKey((x, y) -> (Integer) x + (Integer) y) 5 .sortByKey(); 6 7 counts.foreach(stringIntegerTuple2 ->System.out.println( stringIntegerTuple2._1+":"+stringIntegerTuple2._2));
spark jdk8 單詞統計示例