《資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記(四)——移動平均
阿新 • • 發佈:2019-01-01
移動平均:對時序序列按週期取其值的平均值,這種運算被稱為移動平均。典型例子是求股票的n天內的平均值。
移動平均的關鍵是如何求這個平均值,可以使用Queue來實現。
public class MovingAverageDriver {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("MovingAverageDriver");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("hdfs://hadoop000:8020/dataalgorithminput/MovingAverage.txt" );
List<String> list =
lines.mapToPair(new PairFunction<String, CodeDatePair, DatePricePair>() {
@Override
public Tuple2<CodeDatePair, DatePricePair> call(String line) throws Exception {
String[] array = line.split(",");
return new Tuple2<CodeDatePair, DatePricePair>(new CodeDatePair(array[0],array[1]),new DatePricePair(array[1],array[2]));
}
}).repartitionAndSortWithinPartitions(new CodeDatePartitioner())
.mapToPair(new PairFunction<Tuple2<CodeDatePair,DatePricePair>, String, DatePricePair>() {
@Override
public Tuple2<String, DatePricePair> call(Tuple2<CodeDatePair, DatePricePair> tuple2) throws Exception {
return new Tuple2<String, DatePricePair>(tuple2._1().getCode(), tuple2._2());
}
})
.groupByKey()
.flatMapValues(new Function<Iterable<DatePricePair>, Iterable<Tuple2<DatePricePair, Double>>>(){
@Override
public Iterable<Tuple2<DatePricePair, Double>> call(Iterable<DatePricePair> v1) throws Exception {
int size = 3;
List<Tuple2<DatePricePair, Double>> list = new ArrayList<Tuple2<DatePricePair, Double>>();
Queue<Integer> queue= new LinkedList<Integer>();
for(DatePricePair item : v1){
queue.offer(item.getPrice());
if(queue.size() > size){
queue.remove();
}
int sum = 0;
for(Integer price : queue){
sum += price;
}
list.add(new Tuple2<DatePricePair, Double>(item, Double.valueOf(sum)/Double.valueOf(queue.size())));
}
return list;
}
}).map(new Function<Tuple2<String,Tuple2<DatePricePair,Double>>, String>() {
@Override
public String call(Tuple2<String, Tuple2<DatePricePair, Double>> v1) throws Exception {
return v1._1() + " " + v1._2()._1().getDate() + " " + v1._2()._1().getPrice() + " " + v1._2()._2();
}
}).collect();
for(String item : list){
System.out.println(item);
}
}
}
①對一隻股票按照date來排序
②對資料集根據股票的Code進行groupByKey操作,生成的value是按照時間增大順序排列的date-price對
③在flatMapValues()裡使用Queue實現對連續3天的股價求平均值,並展開。