HDPCD-Java-複習筆記(18)
Java lab booklet
Understanding Pig
[email protected]:~/java/labs/demos# pig
grunt> copyFromLocal/root/java/labs/demos/pigdemo.txt demos/
grunt> cd demos
Define the employees relation, using a schema:
grunt> employees = LOAD 'pigdemo.txt' AS (state,name);
grunt> describe employees;
employees: {state:bytearray,name: bytearray}grunt> DUMP employees;
· (SD,Rich)
· (NV,Barry)
· (CO,George)
· (CA,Ulf)
· (IL,Danielle)
· (OH,Tom)
· (CA,manish)
· (CA,Brian)
· (CO,Mark)
grunt> emp_group = GROUP employees BY state;
grunt> describe emp_group;
emp_group: {group:bytearray,employees: {(state: bytearray,name: bytearray)}}grunt> DUMP emp_group;
· The output is:
· (CA,{(CA,Ulf),(CA,manish),(CA,Brian)})
· (CO,{(CO,George),(CO,Mark)})
· (IL,{(IL,Danielle)})
· (NV,{(NV,Barry)})
· (OH,{(OH,Tom)})
· (SD,{(SD,Rich)})grunt> STORE emp_group INTO 'emp_group_csv' USING
PigStorage(',')
grunt> cat emp_group_csv/part-r-00000
CA,{(CA,Brian),(CA,manish),(CA,Ulf)}
CO,{(CO,Mark),(CO,George)}
IL,{(IL,Danielle)}
NV,{(NV,Barry)}
OH,{(OH,Tom)}
SD,{(SD,Rich)}
The aliases command shows a list of currently defined aliases:
grunt> aliases
aliases: [1-41, ca_only, 1-39, emp_group, employees]
package stockudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class OnBalanceVolume extends EvalFunc<Long> {
private long previousObv = 0;
private double previousClose = 0;
@Override
public Long exec(Tuple input) throws IOException {
long volume = Long.parseLong(input.get(0).toString());
double currentClose = Double.parseDouble(input.get(1).toString());
long obv;
if (currentClose > previousClose) {
obv = previousObv + volume;
}else if (currentClose < previousClose) {
obv = previousObv - volume;
}else {
obv = previousObv;
}
return obv;
}
}
stockvolume.pig
register stockudfs.jar;
stockdata = LOAD 'stocksA' using PigStorage(',') AS (exchange:chararray,symbol:chararray,
date:chararray,open:float,high:float,low:float,close:float,volume:int);
stock_all = FOREACH stockdata GENERATE symbol,date,close,volume;
stock_filter = FILTER stock_all BY symbol == '$symbol';
stock_sorted = ORDER stock_filter BY date ASC;
obv_result = FOREACH stock_sorted GENERATE symbol, date, stockudfs.OnBalanceVolume(volume, close) AS obv;
dump obv_result;
//STORE obv_result INTO 'obv_result';
結果輸出:
(AVA,2009-12-02,245700)
(AVA,2009-12-03,426000)
(AVA,2009-12-04,262200)
(AVA,2009-12-07,131300)
(AVA,2009-12-08,190900)
(AVA,2009-12-09,138700)
(AVA,2009-12-10,217300)
(AVA,2009-12-11,139200)
(AVA,2009-12-14,165500)
(AVA,2009-12-15,311900)
(AVA,2009-12-16,177800)
(AVA,2009-12-17,177900)
(AVA,2009-12-18,532500)
(AVA,2009-12-21,177100)
Lab:
Writing an Accumulator User Defined Function(UDF)
A Pig script the outputs a stock’s highest closing price, along with the following 4 closing prices following the highest close.
package stockudfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
public class HighestClosingPriceWindow extends AccumulatorEvalFunc<String> {
private int windowSize;
private int accumulatedPrices;
private List<String> highDates;
private List<Float> highCloses;
private float highClose;
public HighestClosingPriceWindow(String size) {
int winSize = Integer.parseInt(size);
if (winSize > 0) {
windowSize = winSize;
}else {
windowSize = 1;
}
highDates = new ArrayList<String>();
highCloses = new ArrayList<Float>();
}
@Override
public void accumulate(Tuple b) throws IOException {
DataBag values = (DataBag) b.get(0);
Iterator<Tuple> iterator = values.iterator();
float currentClose;
while (iterator.hasNext()) {
Tuple tuple = (Tuple) iterator.next();
currentClose = Float.parseFloat(tuple.get(2).toString());
if (currentClose > highClose) {
highClose = currentClose;
highCloses.add(0, currentClose);
highDates.add(0, tuple.get(1).toString());
accumulatedPrices = 1;
}else if (accumulatedPrices < windowSize) {
highCloses.add(accumulatedPrices, Float.parseFloat(tuple.get(2).toString()));
highDates.add(accumulatedPrices, tuple.get(1).toString());
accumulatedPrices ++;
}
}
}
@Override
public void cleanup() {
highClose = 0;
highCloses.clear();
highDates.clear();
accumulatedPrices = 0;
}
@Override
public String getValue() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < highCloses.size(); i++) {
builder.append(highDates.get(i) + " " + highCloses.get(i) + "\n");
}
return builder.toString();
}
}
highclose.pig
register stockudfs.jar;
define HighestClosingPriceWindow stockudfs.HighestClosingPriceWindow('4');
stockdata = LOAD 'stocksA' using PigStorage(',') AS (
exchange:chararray,
symbol:chararray,
date:chararray,
open:float,
high:float,
low:float,
close:float,
volume:int
);
stocks_all = FOREACH stockdata GENERATE symbol, date, close;
stocks_group = GROUP stocks_all BY symbol;
stocks_high = FOREACH stocks_group {
sorted = ORDER stocks_all BY date ASC;
GENERATE group as symbol, HighestClosingPriceWindow(sorted) as result;
}
dump stocks_high;
結果:
(AVT,2000-04-28 78.62
2000-05-01 78.44
2000-05-02 78.31
2000-05-03 74.06
2000-04-27 76.25
2000-04-26 74.19
2000-04-25 73.44
1997-12-08 72.16)
(AXE,2007-07-24 86.11
2007-07-25 83.8
2007-07-26 82.25
2007-07-27 81.3)