1. 程式人生 > >Spark 常用Action算子

Spark 常用Action算子

cti data lock sys exception arrays text reduce color

Java版
 1 package com.huanfion.Spark;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaPairRDD;
 5 import org.apache.spark.api.java.JavaRDD;
 6 import org.apache.spark.api.java.JavaSparkContext;
 7 import org.apache.spark.api.java.function.Function2;
 8 import scala.Tuple2;
9 10 import java.util.Arrays; 11 import java.util.List; 12 import java.util.Map; 13 14 public class ActionJava { 15 public static JavaSparkContext getsc() { 16 SparkConf conf = new SparkConf().setAppName("action").setMaster("local"); 17 return new JavaSparkContext(conf); 18 }
19 20 public static void main(String[] args) { 21 // reduce(); 22 // collect(); 23 // count(); 24 // take(); 25 countByKey(); 26 } 27 28 public static void reduce() { 29 List list = Arrays.asList(1, 2, 3, 4); 30 JavaRDD<Integer> rdd = getsc().parallelize(list);
31 int reducevalue = rdd.reduce(new Function2<Integer, Integer, Integer>() { 32 @Override 33 public Integer call(Integer v1, Integer v2) throws Exception { 34 return v1 + v2; 35 } 36 }); 37 System.out.println(reducevalue); 38 } 39 40 public static void collect() { 41 List list = Arrays.asList(1, 2, 3, 4); 42 JavaRDD<Integer> rdd = getsc().parallelize(list); 43 System.out.println(rdd.collect()); 44 } 45 46 public static void save() { 47 List list = Arrays.asList(1, 2, 3, 4); 48 JavaRDD<Integer> rdd = getsc().parallelize(list); 49 rdd.saveAsTextFile("hdfs://");//此處的hdfs目錄路徑必須存在 50 } 51 52 public static void count() { 53 List list = Arrays.asList(1, 2, 3, 4); 54 JavaRDD<Integer> rdd = getsc().parallelize(list); 55 System.out.println(rdd.count()); 56 } 57 58 public static void take() { 59 List list = Arrays.asList(1, 2, 3, 4); 60 JavaRDD<Integer> rdd = getsc().parallelize(list); 61 List<Integer> takevalue = rdd.take(2); 62 System.out.println(takevalue); 63 } 64 65 public static void countByKey() { 66 List list = Arrays.asList(new Tuple2<>("class_1", 91), 67 new Tuple2<>("class_2", 78), 68 new Tuple2<>("class_1", 99), 69 new Tuple2<>("class_2", 76), 70 new Tuple2<>("class_2", 90)); 71 JavaPairRDD<String, Integer> rdd = getsc().parallelizePairs(list); 72 Map<String, Long> values = rdd.countByKey(); 73 values.forEach((x, y) -> System.out.println(x + ":" + y)); 74 } 75 }

Scala版本
package com.huanfion.Spark
 
import org.apache.spark.{SparkConf, SparkContext}
 
object ActionScala {
  def getsc: SparkContext = {
    val sparkconf = new SparkConf().setAppName("action").setMaster("local")
    new SparkContext(sparkconf)
  }
 
  def main(args: Array[String]): Unit = {
    //    reduce
    //    count
    //    collect
//    take
    countByKey
  }
 
  def reduce = {
    val list = Array(1, 2, 3, 4)
    val rdd = getsc.parallelize(list)
    System.out.println(rdd.reduce(_ + _))
  }
 
  def count = {
    val list = Array(1, 2, 3, 4)
    val rdd = getsc.parallelize(list)
    System.out.println(rdd.count())
  }
 
  def collect = {
    val list = Array(1, 2, 3, 4)
    val rdd = getsc.parallelize(list)
    val value = rdd.collect();
    value.foreach(x => System.out.println(x))
  }
 
  def take = {
    val list = Array(1, 2, 3, 4)
    val rdd = getsc.parallelize(list)
    val value=rdd.take(2)
    value.foreach(x=>System.out.println(x))
  }
  def countByKey={
    val list = Array(new Tuple2("class_1", 91),
      Tuple2("class_2", 78),
      Tuple2("class_1", 99),
      Tuple2("class_2", 76),
      Tuple2("class_2", 90));
    val rdd=getsc.parallelize(list)
    val countvalue=rdd.countByKey()
    countvalue.foreach(x=>System.out.println(x._1+":"+x._2))
  }
}

Spark 常用Action算子