java使用spark2開發本地測試的wordCount程式
阿新 • • 發佈:2018-11-11
package cn.spark.study.core; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; /** * 使用java開發本地測試的wordCount程式 * @author meng * */ public class WordCount { private static final Logger logger = LoggerFactory.getLogger(WordCountCluster.class); public static void main(String[] args) { //建立sparkConf物件,設定spark應用的配置資訊 SparkConf conf = new SparkConf() .setAppName("WordCount") .setMaster("local"); //spark應用程式要連線的spark叢集的master節點的url,local代表的是本地執行 //.setMaster("spark://ip:port"); //建立JavaSparkContext物件 JavaSparkContext sc = new JavaSparkContext(conf); //針對輸入源(hdfs檔案、本地檔案等)建立一個初始的RDD JavaRDD<String> lines = sc.textFile("G://bigData/doc/test/meng.txt"); //對初始RDD進行transformation操作,如flatMap、mapToPair、reduceByKey //將每一行拆分成單個的單詞 //FlatMapFunction的兩個泛型引數代表了輸入輸出的型別 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //需要將每一個單詞對映為(單詞,1)的格式 //JavaPairRDD的兩個引數代表了Tuple元素的第一個值和第二個值 JavaPairRDD<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String,Integer>(word,1); } }); //需要以單詞作為key,統計每個單詞出現的次數 JavaPairRDD<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //foreach觸發程式執行 wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times."); } }); sc.close(); } }