spark 平行計算 前n項和
阿新 • • 發佈:2019-02-01
在單執行緒計算中前n項和計算一直沒有障礙,隨著資料量的膨脹,單執行緒計算已經無法滿足資料計算。計算逐漸被遷移到spark或者hadoop叢集上平行計算,但是無論spark還是hadoop平行計算前n項和一直是一個痛點,只能做到每個結點或者容器上的前N項和,卻無法做到計算全域性前N項和。
現提供一種解決方案,希望大家多多指正。計算過程需要兩次便利全部資料。第一次遍歷計算每個容器中資料加和結果,並返回paritition的id和容器中資料家和。第二次遍歷才計算前Nx項和的家和。現有java版本實現,如需要scala版本或者python版本實現請私信本人。
public void sum(){ SparkConf conf = new SparkConf().setMaster("local").setAppName("temp"); JavaSparkContext ctx = new JavaSparkContext(conf); List<Integer> list = Arrays.asList(1,2,3,4,5,6,7); JavaRDD<Integer> soureceRdd = ctx.parallelize(list,4).cache(); List<Tuple2<Integer, Integer>> partitionSub = soureceRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<Integer, Integer>> call(Integer partitionId, Iterator<Integer> v2) throws Exception { Integer result = 0; while(v2.hasNext()){ result += v2.next(); } return Arrays.asList(new Tuple2<Integer, Integer>(partitionId,result)).iterator(); } }, true).collect(); Map<Integer, Integer> paritionSum = this.sumPriPartition(partitionSub); JavaRDD<Integer> x = soureceRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { private static final long serialVersionUID = 1L; @Override public Iterator<Integer> call(Integer v1, Iterator<Integer> v2) throws Exception { List<Integer> result = new CopyOnWriteArrayList<Integer>(); Integer proPartitionSum = paritionSum.get(v1); while(v2.hasNext()){ proPartitionSum+=v2.next(); result.add(proPartitionSum); } return result.iterator(); } }, true); } /*結果<partitionId,當前partition之前所有partition資料和>*/ public Map<Integer, Integer> sumPriPartition(List<Tuple2<Integer, Integer>> list){ Map<Integer, Integer> map = new HashMap<Integer, Integer>(); Integer caluer = 0; for(Tuple2<Integer, Integer> tuple: list){ Integer partitionId = tuple._1; map.put(partitionId, caluer); caluer+=tuple._2; } return map; }