java實現RDD運算元
阿新 • • 發佈:2019-01-09
spark基礎與java api介紹
textFile: 可將本地檔案或HDFS檔案轉換成RDD,讀取本地檔案需要各節點上都存在,或者通過網路共享該檔案
map: JavaRDD<T> -> JavaRDD<U>
JavaRDD<String> lines = sc.textFile(uri, 1);union: 合併兩個RDD
JavaRDD<String> data1 = sc.textFile( "E:\\1.txt"); JavaRDD<String> data2 = sc.textFile( "E:\\2.txt"); JavaRDD<String> union = data1.union(data2);saveAsTextFile: 將結果儲存到HDFS中
counts.saveAsTextFile (args[1]);
JavaRDD<Integer> lineLengths = lines.map( new Function<String, Integer>() { public Integer call(String s) { return s.length(); } });
JavaRDD<List<String>> transactions = data.map( new Function<String, List<String>>() {reduce: JavaRDD<T> -> Tpublic List<String> call(String line) { String[] parts = line.split( " "); return Arrays. asList(parts); } } ); JavaRDD<String> -> JavaRDD<List<String>> JavaRDD<List<String>> transactions = data.map( newFunction<String, List<String>>() { public List<String> call(String line) { String[] parts = line.split( " "); return Arrays. asList(parts); } } );
int totalLength = lineLengths.reduce( new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } });flatmap: JavaRDD<T> -> JavaRDD<U> 將一條 rdd資料使用你定義的函式給分解成多條 rdd資料
JavaRDD<String> words = lines.flatMap (new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaRDD<String> words = lines. flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { String[] words=s.split(" "); return Arrays.asList(words); } });mapToPair: JavaRDD<T> -> JavaPairRDD<T, U> pairFunction<T,K,V>: T:輸入型別;K,V:輸出鍵值對
JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { //scala.Tuple2<K,V> call(T t) //Tuple2為 scala中的一個物件,call方法的輸入引數為T,即輸入一個單詞s,新的Tuple2物件的key為這個單詞,計數為1 public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } });reduceByKey: JavaPairRDD<T, U> -> JavaPairRDD<T, U> 呼叫Function2物件,Function2<T1,T2,R>,輸入兩個引數,T1,T2,返回R,若ones有<"one", 1>, <"one", 1>,會根據"one"將相同的pair單詞個數進行統計,輸入為Integer,輸出也為Integer,輸出<"one", 2>
JavaPairRDD<String, Integer> counts = ones. reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } });collect: JavaRDD<A, B, C...> -> List<TupleN<A, B, C...>> 返回一個包含RDD內所有元素的Array
List<Tuple2<String, Integer>> output = counts. collect(); for (Tuple2<?, ?> tuple : output) System. out .println(tuple._1() + ": " + tuple._2()); List<String> line = lines.collect(); for (String val:line) System. out .println(val); for(Tuple2<String, Iterable<String>> tuple :instruction_sets.collect()){ System. out.println(tuple._1()); Iterator<String> it= tuple._2().iterator(); while(it.hasNext()){ System. out.println(it.next()); } System. out.println(); }filter: JavaRDD<T> -> JavaRDD<U> 定義一個返回 bool型別的函式,spark執行filter的時候會過濾掉那些返回只為false的資料
JavaRDD<String> contaninsE = lines.filter( new Function<String, Boolean>() { public Boolean call(String s) throws Exception { return (s.contains("they" )); } });sortByKey: JavaPairRDD<T, U> -> JavaPairRDD<T, U> 把RDD記錄按key值的字典序從小到大排序
JavaPairRDD<String,Integer> sort = counts.sortByKey();groupByKey: JavaPairRDD<U, T> -> JavaPairRDD<U, Iterable<T>> groupByKey()會使用RDD中的鍵來對資料進行分組。對於一個由型別K的鍵和型別V的值組成的 RDD,鎖的到的結果RDD型別會是[K,Iterable[v]]。以下是程式示例,對PairRDD呼叫 groupByKey()函式之後,會返回JavaPairRDD<Integer,Iterable<Integer>>型別的結果,也就 是所有同一個Key的value都可以呼叫Iterator進行遍歷。
JavaPairRDD<String, Iterable<String>> instruction_sets = user_instruction.groupByKey(); //使用者-指令集 -> 指令集 並去掉重複元素 JavaRDD<List<String>> transactions = instruction_sets.map( new Function<Tuple2<String, Iterable<String>>, List<String>>() { @Override public List<String> call(Tuple2<String, Iterable<String>> sets) throws Exception { //將Iterable 轉為List List<String> list = new ArrayList<String>(); for(String s: sets. _2) list.add(s); //刪除重複元素 HashSet<String> set = new HashSet<String>(list); ArrayList<String> listWithoutDuplicateElements = new ArrayList<String>(set); return listWithoutDuplicateElements; }});小例子
List<Tuple2<Integer,Integer>> list1=new ArrayList<Tuple2<Integer, Integer>>(); list1.add(new Tuple2<Integer,Integer>(1,1)); list1.add(new Tuple2<Integer, Integer>(2,2)); list1.add(new Tuple2<Integer, Integer>(1,3)); list1.add(new Tuple2<Integer, Integer>(2,4)); JavaPairRDD<Integer,Integer> nums1=sc.parallelizePairs(list1); JavaPairRDD<Integer,Iterable<Integer>>results =nums1.groupByKey(); //接下來遍歷輸出results,注意其中關於Iterable遍歷的處理 for(Tuple2<Integer,Iterable<Integer>> tuple :results.collect()){ System.out.print(tuple._1()+": "); Iterator<Integer> it= tuple._2().iterator(); while(it.hasNext()){ System.out.print(it.next()+" "); } System.out.println(); }