HDPCD-Java-複習筆記(23)- lab
阿新 • • 發佈:2018-12-26
Java lab booklet
package hbase; public class StockConstants { public static final byte [] PRICE_COLUMN_FAMILY = "p".getBytes(); public static final byte [] HIGH_QUALIFIER = "high".getBytes(); public static final byte [] LOW_QUALIFIER = "low".getBytes(); public static final byte [] CLOSING_QUALIFIER = "close".getBytes(); public static final byte [] VOLUME_QUALIFIER = "vol".getBytes(); }
package hbase; import static hbase.StockConstants.CLOSING_QUALIFIER; import static hbase.StockConstants.HIGH_QUALIFIER; import static hbase.StockConstants.LOW_QUALIFIER; import static hbase.StockConstants.PRICE_COLUMN_FAMILY; import static hbase.StockConstants.VOLUME_QUALIFIER; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class StockImporter extends Configured implements Tool { public static class StockImporterMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { private final String COMMA = ","; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(COMMA); if (words[0].equals("exchange")) return; String symbol = words[1]; String date = words[2]; double highPrice = Double.parseDouble(words[4]); double lowPrice = Double.parseDouble(words[5]); double closingPrice = Double.parseDouble(words[6]); double volume = Double.parseDouble(words[7]); byte[] stockRowKey = Bytes.add(date.getBytes(), symbol.getBytes()); Put put = new Put(stockRowKey); put.add(PRICE_COLUMN_FAMILY, HIGH_QUALIFIER, Bytes.toBytes(highPrice)); put.add(PRICE_COLUMN_FAMILY, LOW_QUALIFIER, Bytes.toBytes(lowPrice)); put.add(PRICE_COLUMN_FAMILY, CLOSING_QUALIFIER, Bytes.toBytes(closingPrice)); put.add(PRICE_COLUMN_FAMILY, VOLUME_QUALIFIER, Bytes.toBytes(volume)); context.write(null, put); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "StockImportJob"); job.setJarByClass(StockImporter.class); FileInputFormat.setInputPaths(job, new Path("stocksA")); job.setMapperClass(StockImporterMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setNumReduceTasks(0); TableMapReduceUtil.initTableReducerJob("stocks", null, job); TableMapReduceUtil.addDependencyJars(job); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new StockImporter(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }
# hbase shell
hbase (main):001:0> create 'stocks', {NAME => 'p', VERSIONS => 1}
hbase (main):002:0> list
hbase (main):003:0> describe 'stocks'
# hadoop fs -mkdir stocksA
# cd ~/java/workspace/HBaseImport/
# hadoop fs -putstocksA/* stocksA/# export HADOOP_CLASSPATH=`hbase classpath`
# yarn jar hbaseimport.jar
package hbasemr;
import static hbasemr.StockConstants.CLOSING_QUALIFIER;
import static hbasemr.StockConstants.DATE_QUALIFIER;
import static hbasemr.StockConstants.INFO_COLUMN_FAMILY;
import static hbasemr.StockConstants.PRICE_COLUMN_FAMILY;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxClosingPriceJob extends Configured implements Tool {
public static class MaxClosingPriceMapper extends TableMapper<Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
Cell closingPrice = value.getColumnLatestCell(PRICE_COLUMN_FAMILY, CLOSING_QUALIFIER);
String keyString = Bytes.toString(key.get());
String symbol = keyString.substring(0, keyString.length() - 10);
String date = keyString.substring(keyString.length() - 10, keyString.length());
outputKey.set(symbol);
outputValue.set(date + Bytes.toDouble(CellUtil.cloneValue(closingPrice)));
context.write(outputKey, outputValue);
}
}
public static class MaxClosingPriceReducer extends TableReducer<Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
double max = 0.0;
String maxDate = "";
for(Text value : values) {
String current = value.toString();
double currentPrice = Double.parseDouble(current.substring(10, current.length()));
if(currentPrice > max) {
max = currentPrice;
maxDate = current.substring(0,10);
}
}
Put put = new Put(key.getBytes());
put.add(INFO_COLUMN_FAMILY, CLOSING_QUALIFIER, Bytes.toBytes(max));
put.add(INFO_COLUMN_FAMILY, DATE_QUALIFIER, Bytes.toBytes(maxDate));
context.write(key, put);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create(getConf());
Job job = Job.getInstance(conf, "MaxClosingPriceJob");
job.setJarByClass(MaxClosingPriceJob.class);
TableMapReduceUtil.addDependencyJars(job);
Scan scan = new Scan();
scan.addColumn(PRICE_COLUMN_FAMILY, CLOSING_QUALIFIER);
TableMapReduceUtil.initTableMapperJob("stocks", scan, MaxClosingPriceMapper.class, Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("stockhighs", MaxClosingPriceReducer.class, job);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) {
int result = 0;
try {
result = ToolRunner.run(new Configuration(), new MaxClosingPriceJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(result);
}
}
package hbasemr;
public class StockConstants {
public static final byte [] PRICE_COLUMN_FAMILY = "p".getBytes();
public static final byte [] HIGH_QUALIFIER = "high".getBytes();
public static final byte [] LOW_QUALIFIER = "low".getBytes();
public static final byte [] CLOSING_QUALIFIER = "close".getBytes();
public static final byte [] VOLUME_QUALIFIER = "vol".getBytes();
public static final byte [] INFO_COLUMN_FAMILY = "info".getBytes();
public static final byte [] DATE_QUALIFIER = "date".getBytes();
}
hbase(main):001:0> create'stockhighs', {NAME => 'info', VERSIONS => 1}
# cd ~/java/workspace/HBaseMR/
# yarn jarhbasemr.jarhbase(main):002:0> scan 'stockhighs', {COLUMNS=>['info:date', 'info:close:toDouble']}