第十八天 -- MapReduce自定義資料型別
阿新 • • 發佈:2018-12-15
第十八天 – MapReduce自定義資料型別
一、多檔案輸出
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 多檔案輸出 * @author lyd * * 資料: hello qianfeng qianfeng world heloo Hello Hi Hello World QF QQ 163.com 15900001111 17900001111 @163.com @189.com $1100000 *[a-z] az-r-00000 hello 1 heloo 1 qianfeng 2 world 1 AZ-r-00001 Hello 2 Hi 1 QF 1 QQ 1 part-r-00002 163.com 1 15900001111 1 17900001111 1 part-r-00003 @163.com 1 @189.com 1 $1100000 1 *[a-z] 1 * */ public class MultiOutputDemo { /** * map階段 * @author lyd * */ public static class MyMapper extends Mapper<Object, Text, Text, Text>{ @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while (st.hasMoreElements()) { context.write(new Text(st.nextToken()), new Text(1+"")); } } } /** * reduce階段 * @author lyd * */ public static class MyReducer extends Reducer<Text, Text, Text, Text>{ //獲取多檔案輸出物件 MultipleOutputs<Text, Text> mos = null; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, Text>(context); } @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { int counter = 0; for (Text t : values) { counter += Integer.parseInt(t.toString()); } //判斷首字母 String firstchar = key.toString().substring(0, 1); if(firstchar.matches("^[a-z]")){ mos.write("az", key, new Text(counter+"")); } else if(firstchar.matches("^[A-Z]")){ mos.write("AZ", key, new Text(counter+"")); } else if(firstchar.matches("^[0-9]")){ mos.write("09", key, new Text(counter+"")); } else { mos.write("others", key, new Text(counter+"")); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //關閉 mos.close(); } } /** * 驅動 * @param args * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = Job.getInstance(conf, "multipleoutput"); job.setJarByClass(MultiOutputDemo.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設定多檔案輸出資訊 MultipleOutputs.addNamedOutput(job, "az", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "AZ", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "09", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "others", TextOutputFormat.class, Text.class, Text.class); job.setReducerClass(MyReducer.class); /*job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);*/ setArgs(job,args); //提交作業 int issuccessed = job.waitForCompletion(true) ? 0 : 1; //關閉job System.exit(issuccessed); } /** * 作業引數處理 * @param job * @param args */ public static void setArgs(Job job , String[] args){ try { if(args.length != 2){ System.out.println("argments size is not enough!!!"); System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata"); } //設定輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(args[0])); //判斷輸出目錄是否存在 FileSystem fs = FileSystem.get(job.getConfiguration()); Path op = new Path(args[1]); if(fs.exists(op)){ fs.delete(op, true); } //設定輸出資料目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); } catch (Exception e) { e.printStackTrace(); } } }
二、二次排序 – 記憶體排序
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * @author lyd * 778 89 8 55 768 88 768 68 778 90 798 68 8 99 8 12 768 78 輸出資料: ------------ 8 12 8 55 8 99 ----------- 768 68 768 78 768 88 ----------- 778 89 778 90 ----------- 798 68 */ public class SecondarySort implements Tool{ /** * map階段 * @author lyd * */ public static class MyMapper extends Mapper<Object, Text, IntWritable, IntWritable>{ @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); context.write(new IntWritable(Integer.parseInt(fields[0])), new IntWritable(Integer.parseInt(fields[1]))); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } /** * reduce階段 * @author lyd * */ public static class MyReducer extends Reducer<IntWritable, IntWritable, Text, Text>{ @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { /** * 8 list(55,99,12) */ List<Integer> li = new ArrayList<Integer>(); for (IntWritable t : values) { li.add(t.get()); } //對li進行排序 Collections.sort(li); //輸出 context.write(new Text(key.toString()), new Text(li.get(li.size()-1)+"")); /*for (Integer i : li) { context.write(new Text(key.toString()), new Text(i.toString())); }*/ context.write(new Text("---------------------"), new Text("")); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } public void setConf(Configuration conf) { } public Configuration getConf() { return new Configuration(); } /** * 驅動方法 */ public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = Job.getInstance(conf, "SecondarySort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); setArgs(job,args); //提交作業 int issuccessed = job.waitForCompletion(true) ? 0 : 1; return issuccessed; } /** * 作業引數處理 * @param job * @param args */ public static void setArgs(Job job , String[] args){ try { if(args.length != 2){ System.out.println("argments size is not enough!!!"); System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata"); } //設定輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(args[0])); //判斷輸出目錄是否存在 FileSystem fs = FileSystem.get(job.getConfiguration()); Path op = new Path(args[1]); if(fs.exists(op)){ fs.delete(op, true); } //設定輸出資料目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); } catch (Exception e) { e.printStackTrace(); } } /** * 主函式 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int isok = ToolRunner.run(new Configuration(), new SecondarySort(), args); System.exit(isok); } }
三、二次排序 – KV排序
demo.java
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * * @author lyd * 778 89 8 55 768 88 768 68 778 90 798 68 8 99 8 12 768 78 輸出資料: 8 12 8 55 8 99 ----------- 768 68 768 78 768 88 ----------- 778 89 778 90 ----------- 798 68 */ public class SecondarySort_kv implements Tool{ /** * map階段 * @author lyd * */ public static class MyMapper extends Mapper<Object, Text, SecondarySortWritable, IntWritable>{ @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void map(Object key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); SecondarySortWritable ssw = new SecondarySortWritable(Integer.parseInt(fields[0]), Integer.parseInt(fields[1])); context.write(ssw, new IntWritable(Integer.parseInt(fields[1]))); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } /** * reduce階段 * @author lyd * */ public static class MyReducer extends Reducer<SecondarySortWritable, IntWritable, SecondarySortWritable, Text>{ @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override protected void reduce(SecondarySortWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { } } public void setConf(Configuration conf) { } public Configuration getConf() { return new Configuration(); } /** * 驅動方法 */ public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = Job.getInstance(conf, "SecondarySort"); job.setJarByClass(SecondarySort_kv.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SecondarySortWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(SecondarySortWritable.class); job.setOutputValueClass(Text.class); setArgs(job,args); //提交作業 int issuccessed = job.waitForCompletion(true) ? 0 : 1; return issuccessed; } /** * 作業引數處理 * @param job * @param args */ public static void setArgs(Job job , String[] args){ try { if(args.length != 2){ System.out.println("argments size is not enough!!!"); System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata"); } //設定輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(args[0])); //判斷輸出目錄是否存在 FileSystem fs = FileSystem.get(job.getConfiguration()); Path op = new Path(args[1]); if(fs.exists(op)){ fs.delete(op, true); } //設定輸出資料目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); } catch (Exception e) { e.printStackTrace(); } } /** * 主函式 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int isok = ToolRunner.run(new Configuration(), new SecondarySort_kv(), args); System.exit(isok); } }
實現自定義資料型別
自定義資料型別時,需要實現Writable介面或WritableComparable介面
前者需要實現write(),readFields()方法
後者在前者的基礎上多實現一個compareTo()方法,用於排序
SecondarySortWritable.java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
/**
* @ClassName SecondarySortWritable
* @Author lyd
* @Date $ $
* @Vesion 1.0
* @Description //TODO $
**/
public class SecondarySortWritable implements WritableComparable<SecondarySortWritable> {
private int first;
private int second;
public SecondarySortWritable(){
}
public SecondarySortWritable(int first, int second) {
this.first = first;
this.second = second;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.first);
dataOutput.writeInt(this.second);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.first = dataInput.readInt();
this.second = dataInput.readInt();
}
@Override
public int compareTo(SecondarySortWritable o) {
if(o == this){
return 0;
}
// int tmp = this.first - o.first; //升序
int tmp = o.first - this.first; //降序
// return tmp;
if(tmp != 0){
return tmp;
}
return o.second - this.second; //降序
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SecondarySortWritable that = (SecondarySortWritable) o;
return first == that.first &&
second == that.second;
}
@Override
public int hashCode() {
return Objects.hash(first, second);
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public String toString() {
return "SecondarySortWritable{" +
"first=" + first +
", second=" + second +
'}';
}
}
四、求統計結果前N名
TopN.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.TreeSet;
/**
求統計結果的前3名:
hadoop hadoop hadoop hadoop hadoop is ok is nice is better
spark hbase hive flume nice
輸出:
hadoop 5
is 3
nice 2
**/
public class TopN implements Tool {
private Configuration conf = new Configuration();
/**
* map階段
* @author lyd
*
*/
public static class MyMapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
}
@Override
protected void map(Object key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String [] fields = line.split(" ");
for (String string : fields) {
context.write(new Text(string), new Text(1+""));
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
}
}
/**
* reduce階段
* @author lyd
*
*/
public static class MyReducer extends Reducer<Text, Text, TopNWritable, NullWritable>{
TreeSet<TopNWritable> ts = new TreeSet<TopNWritable>();
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
}
int SIZE = 3; //top N的指定
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int counter = 0;
for (Text t : values) {
counter += Integer.parseInt(t.toString());
}
TopNWritable tn = new TopNWritable(key.toString(), counter);
//將所有的物件新增到ts中
ts.add(tn);
if(ts.size() > SIZE){
//移除較小
ts.remove(ts.last());
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
for (TopNWritable topNWritable : ts) {
context.write(topNWritable, NullWritable.get());
}
}
}
public void setConf(Configuration conf) {
}
public Configuration getConf() {
return this.conf;
}
/**
* 驅動方法
*/
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = Job.getInstance(conf, "TopN");
job.setJarByClass(TopN.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(TopNWritable.class);
job.setOutputValueClass(NullWritable.class);
setArgs(job,args);
//提交作業
int issuccessed = job.waitForCompletion(true) ? 0 : 1;
return issuccessed;
}
/**
* 作業引數處理
* @param job
* @param args
*/
public static void setArgs(Job job , String[] args){
try {
if(args.length != 2){
System.out.println("argments size is not enough!!!");
System.out.println("Useage :yarn jar *.jar wordcount /inputdata /outputdata");
}
//設定輸入檔案路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
//判斷輸出目錄是否存在
FileSystem fs = FileSystem.get(job.getConfiguration());
Path op = new Path(args[1]);
if(fs.exists(op)){
fs.delete(op, true);
}
//設定輸出資料目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 主函式
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int isok = ToolRunner.run(new Configuration(), new TopN(), args);
System.exit(isok);
}
}
實現自定義資料型別
TopNWritable.java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
/**
* @ClassName TopNWritable
* @Author lyd
* @Date $ $
* @Vesion 1.0
* @Description
* 自定義資料型別:
* 1、需要實現WritableComparable(可實現排序) 或者Writable(不能排序,只能序列化)
* 2、需要注意write方法和readFields()方法中的欄位的順序、型別、個數相同
* 3、compareTo()用於排序使用
**/
public class TopNWritable implements WritableComparable<TopNWritable> {
private String words;
private int count;
public TopNWritable(){
}
public TopNWritable(String words, int count) {
this.words = words;
this.count = count;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.words);
dataOutput.writeInt(this.count);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.words = dataInput.readUTF();
this.count = dataInput.readInt();
}
@Override
public int compareTo(TopNWritable o) {
if(o == this){
return 0;
}
int tmp = o.count - this.count;
if(tmp != 0){
return tmp;
}
return this.words.compareTo(o.words);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopNWritable that = (TopNWritable) o;
return count == that.count &&
Objects.equals(words, that.words);
}
@Override
public int hashCode() {
return Objects.hash(words, count);
}
@Override
public String toString() {
return "words='" + words + ", count=" + count;
}
}