spark1.x和2.xIterable和iterator相容問題
阿新 • • 發佈:2018-11-19
1. spark 1.x 升級到spark 2.x 對於普通的spark來說,變動不大 : 1 舉一個最簡單的例項: spark1.x public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) { JavaPairRDD<String, Integer> testRdd = spark1Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterable<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list; } }); return spark1Rdd; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 spark2.x public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) { JavaPairRDD<String, Integer> testRdd2 = spark2Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list.iterator(); } }); return spark2Rdd; } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 需要說明的是: 上面的返回的rdd就直接用輸入的 RDD顯然是不合理的! 只是為了用最簡潔的方式介紹程式碼的轉換而已! 可以看到 : 區別主要在於 1. spark 1.x中的Iterable物件 變成了 spark2.x中的Iterator物件 2. 相應的,對於返回值為list的RDD, spark2.x中要返回list.iterator(); 1 2 3 還是很簡單的吧 問題在於 : 如果你有幾個spark程式要執行在不同的環境下,(有的現場用1.x,有的現場用2.x) 你需要同時維護兩種不同版本的spark,是不是耗時又耗力呢? 這個時候就需要考慮到 spark版本的相容性,使你的程式能成功的執行在各種叢集環境下 2. spark版本的相容 寫一個簡單的工具類如下 : import java.util.Iterator; public class MyIterator<T> implements Iterator, Iterable { private Iterator myIterable; public MyIterator(Iterable iterable) { myIterable = iterable.iterator(); } @Override public boolean hasNext() { return myIterable.hasNext(); } @Override public Object next() { return myIterable.next(); } @Override public void remove() { myIterable.remove(); } @Override public Iterator iterator() { return myIterable; } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 只需要進行如上設計就可以實現版本的相容了 那麼應該如何應用呢? JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() { @Override public MyIterator<String> call(String s) throws Exception { String[] split = s.split("\\s+"); MyIterator myIterator = new MyIterator(Arrays.asList(split)); return myIterator; } });