spark2.x由淺入深深到底系列六之RDD java api調用scala api的原理
阿新 • • 發佈:2017-09-18
spark 大數據 javaapi 老湯 rdd
RDD java api其實底層是調用了scala的api來實現的,所以我們有必要對java api是怎麽樣去調用scala api,我們先自己簡單的實現一個scala版本和java版本的RDD和SparkContext
一、簡單實現scala版本的RDD和SparkContext
class RDD[T](value: Seq[T]) { //RDD的map操作 def map[U](f: T => U): RDD[U] = { new RDD(value.map(f)) } def iterator[T] = value.iterator } class SparkContext { //創建一個RDD def createRDD(): RDD[Integer] = new RDD[Integer](Seq(1, 2, 3)) }
二、簡單實現java版本的RDD和SparkContext
//這個時java中的一個接口 //我們可以將scala中的map需要的函數其實就是對應著java中的一個接口 package com.twq.javaapi.java7.function; public interface Function<T1, R> extends Serializable { R call(T1 v1) throws Exception; } //這邊實現的java版的RDD和SparkContext其實還是用scala代碼實現,只不過這些scala代碼可以被java代碼調用了 import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ import com.twq.javaapi.java7.function.{Function => JFunction} //每一個JavaRDD都會含有一個scala的RDD,用於調用該RDD的api class JavaRDD[T](val rdd: RDD[T]) { def map[R](f: JFunction[T, R]): JavaRDD[R] = //這裏是關鍵,調用scala RDD中的map方法 //我們將java的接口構造成scala RDD的map需要的函數函數 new JavaRDD(rdd.map(x => f.call(x))) //我們需要將scala的Iterator轉成java版的Iterator def iterator: JIterator[T] = rdd.iterator.asJava } //每個JavaSparkContext含有一個scala版本的SparkContext class JavaSparkContext(sc: SparkContext) { def this() = this(new SparkContext()) //轉調scala版本的SparkContext來實現JavaSparkContext的功能 def createRDD(): JavaRDD[Integer] = new JavaRDD[Integer](sc.createRDD()) }
三、寫java代碼調用rdd java api
package com.twq.javaapi.java7; import com.twq.javaapi.java7.function.Function; import com.twq.rdd.api.JavaRDD; import com.twq.rdd.api.JavaSparkContext; import java.util.Iterator; /** * Created by tangweiqun on 2017/9/16. */ public class SelfImplJavaRDDTest { public static void main(String[] args) { //初始化JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(); //調用JavaSparkContext的api創建一個RDD JavaRDD<Integer> firstRDD = jsc.createRDD(); //對創建好的firstRDD應用JavaRDD中的map操作 JavaRDD<String> strRDD = firstRDD.map(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return v1 + "test"; } }); //將得到的RDD的結果打印,結果為 //1test //2test //3test Iterator<String> result = strRDD.iterator(); while (result.hasNext()) { System.out.println(result.next()); } } }
以上就是RDD java api調用scala api的實現原理,雖然只舉了map操作,但是其他的類似於flatMap操作的實現都是類似的
接下來可以詳細了解RDD java的每一個api了
我們可以參考spark core RDD api來詳細理解scala中的每一個api。。。
spark2.x由淺入深深到底系列六之RDD java api調用scala api的原理