Spark中CountByValue運算元Updatestatebykey運算元
阿新 • • 發佈:2019-01-25
一.在spark中,我們用了groupbykey運算元之後,一個key對應很多的運算元。我們想數一數這個key內部value的個數,我們就可以用countbyvalue。
package com.latrobe.spark import org.apache.spark.{SparkContext,SparkConf} /** * Created by spark on 15-1-18. 統計出集合中每個元素的個數 */ object CountByValue{def main(args:Array[String]){val conf=new SparkConf().setAppName("spark-demo").setMaster("local")val sc=new SparkContext(conf) val xx=sc.parallelize(List(1,1,1,1,2,2,3,6,5,9)) // 列印結果:Map(2 -> 2, 5 -> 1, 1 -> 4, 9 -> 1, 3 -> 1, 6 -> 1) println(xx.countByValue())}}
二.在spark中,我們用了groupbykey運算元之後,一個key對應很多的運算元。我們想將這個key對用的value的值加起來,這時候就可以用updatestatebykey運算元。
package com.bjsxt; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import scala.Tuple2; import scala.actors.threadpool.Arrays; public class UpdateStateByKeyOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("test").setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 去除日誌的重複 */ jsc.sparkContext().setLogLevel("ERROR"); /** * 設定checkpoint目錄: 多久會接收記憶體中的資料(每一個key所對應的狀態)寫入到磁碟上呢? * 如果你的batchinterval小於10S,那麼10S會將記憶體中的資料寫入到磁碟一份 * 如果batchinterval大於10S,那麼就以batchinterval為準 這樣做是為了防止頻繁的寫HDFS * 設定checkpoint目錄的兩種方式: 1.jsc.checkpoint("./checkpoint"); 2.JavaSparkContext * sc=jsc.sparkContxt(); sc.setCheckpointDir("./checkpoint"); * */ jsc.checkpoint("./checkpoint"); /** * 從Linux端接收資料 */ JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node01", 9999); /** * 呼叫flatmap運算元進行切分 */ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } }); /** * 呼叫mapToPair 進行分類 */ JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); /** * 呼叫updateStateByKey運算元 * */ JavaPairDStream<String, Integer> counts = ones .updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { /** * values:經過分組最後 這個key所對應的value [1,1,1,1,1] state:這個key在本次之前之前的狀態 */ Integer updateValue = 0; if (state.isPresent()) { updateValue = state.get(); } for (Integer value : values) { updateValue += value; } return Optional.of(updateValue); } }); // output operator counts.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }