1. 程式人生 > >HDPCD-Java-複習筆記(18)

HDPCD-Java-複習筆記(18)

Java lab booklet

Understanding Pig

[email protected]:~/java/labs/demos# pig

grunt> copyFromLocal/root/java/labs/demos/pigdemo.txt demos/

grunt> cd demos

Define the employees relation, using a schema:

grunt> employees = LOAD 'pigdemo.txt'  AS (state,name);

grunt> describe employees;

employees: {state:bytearray,name: bytearray}

grunt> DUMP employees;

·        (SD,Rich)

·        (NV,Barry)

·        (CO,George)

·        (CA,Ulf)

·        (IL,Danielle)

·        (OH,Tom)

·        (CA,manish)

·        (CA,Brian)

·        (CO,Mark)

grunt> emp_group = GROUP employees BY state;

grunt> describe emp_group;

emp_group: {group:bytearray,employees: {(state: bytearray,name: bytearray)}}

grunt> DUMP emp_group;

·        The output is:

·        (CA,{(CA,Ulf),(CA,manish),(CA,Brian)})

·        (CO,{(CO,George),(CO,Mark)})

·        (IL,{(IL,Danielle)})

·        (NV,{(NV,Barry)})

·        (OH,{(OH,Tom)})

·        (SD,{(SD,Rich)})

grunt> STORE emp_group INTO 'emp_group_csv' USING PigStorage(',')

;

grunt> cat emp_group_csv/part-r-00000
CA,{(CA,Brian),(CA,manish),(CA,Ulf)}
CO,{(CO,Mark),(CO,George)}
IL,{(IL,Danielle)}
NV,{(NV,Barry)}
OH,{(OH,Tom)}
SD,{(SD,Rich)}

The aliases command shows a list of currently defined aliases:

grunt> aliases
aliases: [1-41, ca_only, 1-39, emp_group, employees]

package stockudfs;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class OnBalanceVolume extends EvalFunc<Long> {
	private long previousObv = 0;
	private double previousClose = 0;

	@Override
	public Long exec(Tuple input) throws IOException {
		long  volume = Long.parseLong(input.get(0).toString());
		double currentClose = Double.parseDouble(input.get(1).toString());
		long obv;
		if (currentClose > previousClose) {
			obv = previousObv + volume;
		}else if (currentClose < previousClose) {
			obv = previousObv - volume;
		}else {
			obv = previousObv;
		}
		return obv;
	}

}
stockvolume.pig

register stockudfs.jar;

stockdata = LOAD 'stocksA' using PigStorage(',') AS (exchange:chararray,symbol:chararray,
date:chararray,open:float,high:float,low:float,close:float,volume:int);

stock_all = FOREACH stockdata GENERATE symbol,date,close,volume;

stock_filter = FILTER stock_all BY symbol == '$symbol';

stock_sorted = ORDER stock_filter BY date ASC;


obv_result = FOREACH stock_sorted GENERATE symbol, date, stockudfs.OnBalanceVolume(volume, close) AS obv;

dump obv_result;

//STORE obv_result INTO 'obv_result';

結果輸出:

(AVA,2009-12-02,245700)
(AVA,2009-12-03,426000)
(AVA,2009-12-04,262200)
(AVA,2009-12-07,131300)
(AVA,2009-12-08,190900)
(AVA,2009-12-09,138700)
(AVA,2009-12-10,217300)
(AVA,2009-12-11,139200)
(AVA,2009-12-14,165500)
(AVA,2009-12-15,311900)
(AVA,2009-12-16,177800)
(AVA,2009-12-17,177900)
(AVA,2009-12-18,532500)
(AVA,2009-12-21,177100)

Lab: Writing an Accumulator User Defined Function(UDF)
A Pig script the outputs a stock’s highest closing price, along with the following 4 closing prices following the highest close.

package stockudfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

public class HighestClosingPriceWindow extends AccumulatorEvalFunc<String> {
	private int windowSize;
	private int accumulatedPrices;
	private List<String> highDates;
	private List<Float> highCloses;
	private float highClose;
	
	public HighestClosingPriceWindow(String size) {
		int winSize = Integer.parseInt(size);
		if (winSize > 0) {
			windowSize = winSize;
		}else {
			windowSize = 1;
		}
		highDates = new ArrayList<String>();
		highCloses = new ArrayList<Float>();
	}

	@Override
	public void accumulate(Tuple b) throws IOException {
		DataBag values = (DataBag) b.get(0);
		Iterator<Tuple> iterator = values.iterator();
		float currentClose;
		while (iterator.hasNext()) {
			Tuple tuple = (Tuple) iterator.next();
			currentClose = Float.parseFloat(tuple.get(2).toString());
			if (currentClose > highClose) {
				highClose = currentClose;
				highCloses.add(0, currentClose);
				highDates.add(0, tuple.get(1).toString());
				accumulatedPrices = 1;
			}else if (accumulatedPrices < windowSize) {
				highCloses.add(accumulatedPrices, Float.parseFloat(tuple.get(2).toString()));
				highDates.add(accumulatedPrices, tuple.get(1).toString());
				accumulatedPrices ++;
			}
		}
	}

	@Override
	public void cleanup() {
		highClose = 0;
		highCloses.clear();
		highDates.clear();
		accumulatedPrices = 0;
	}

	@Override
	public String getValue() {
		StringBuilder builder = new StringBuilder();
		for (int i = 0; i < highCloses.size(); i++) {
			builder.append(highDates.get(i) + " " + highCloses.get(i) + "\n");
		}
		return builder.toString();
	}

}
highclose.pig

register stockudfs.jar;

define HighestClosingPriceWindow stockudfs.HighestClosingPriceWindow('4');

stockdata = LOAD 'stocksA' using PigStorage(',') AS (
exchange:chararray, 
symbol:chararray, 
date:chararray, 
open:float, 
high:float, 
low:float, 
close:float, 
volume:int
);
stocks_all = FOREACH stockdata GENERATE symbol, date, close;
stocks_group = GROUP stocks_all BY symbol;
stocks_high = FOREACH stocks_group {
sorted = ORDER stocks_all BY date ASC;
GENERATE group as symbol, HighestClosingPriceWindow(sorted) as result; 
}
dump stocks_high;


結果:

(AVT,2000-04-28 78.62
2000-05-01 78.44
2000-05-02 78.31
2000-05-03 74.06
2000-04-27 76.25
2000-04-26 74.19
2000-04-25 73.44
1997-12-08 72.16)
(AXE,2007-07-24 86.11
2007-07-25 83.8
2007-07-26 82.25
2007-07-27 81.3)