hadoop常用演算法簡單例項
阿新 • • 發佈:2019-01-24
例項一、對以下資料進行排序,根據收入減去支出得到最後結餘從大到小排序,資料如下:
SumStep執行之後結果如下:
SortStep執行之後結果為上圖根據結餘從大到小排序。
程式碼如下:
public class InfoBean implements WritableComparable<InfoBean>{ private String account; private double income; private double expenses; private double surplus; public void set(String account, double income, double expenses){ this.account = account; this.income = income; this.expenses = expenses; this.surplus = income - expenses; } @Override public String toString() { return this.income + "\t" + this.expenses + "\t" + this.surplus; } /** * serialize */ public void write(DataOutput out) throws IOException { out.writeUTF(account); out.writeDouble(income); out.writeDouble(expenses); out.writeDouble(surplus); } /** * deserialize */ public void readFields(DataInput in) throws IOException { this.account = in.readUTF(); this.income = in.readDouble(); this.expenses = in.readDouble(); this.surplus = in.readDouble(); } public int compareTo(InfoBean o) { if(this.income == o.getIncome()){ return this.expenses > o.getExpenses() ? 1 : -1; } else { return this.income > o.getIncome() ? -1 : 1; } } public String getAccount() { return account; } public void setAccount(String account) { this.account = account; } public double getIncome() { return income; } public void setIncome(double income) { this.income = income; } public double getExpenses() { return expenses; } public void setExpenses(double expenses) { this.expenses = expenses; } public double getSurplus() { return surplus; } public void setSurplus(double surplus) { this.surplus = surplus; } }
public class SumStep { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SumStep.class); job.setMapperClass(SumMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{ private InfoBean bean = new InfoBean(); private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // split String line = value.toString(); String[] fields = line.split("\t"); // get useful field String account = fields[0]; double income = Double.parseDouble(fields[1]); double expenses = Double.parseDouble(fields[2]); k.set(account); bean.set(account, income, expenses); context.write(k, bean); } } public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{ private InfoBean bean = new InfoBean(); @Override protected void reduce(Text key, Iterable<InfoBean> v2s, Context context) throws IOException, InterruptedException { double in_sum = 0; double out_sum = 0; for(InfoBean bean : v2s){ in_sum += bean.getIncome(); out_sum += bean.getExpenses(); } bean.set("", in_sum, out_sum); context.write(key, bean); } } }
此處的輸入為SumStep的輸出而不是原始檔作為輸入,當然也可以將兩個job合併到一起執行,此處不再討論。
public class SortStep { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SortStep.class); job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(InfoBean.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(InfoBean.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{ private InfoBean bean = new InfoBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String account = fields[0]; double income = Double.parseDouble(fields[1]); double expenses = Double.parseDouble(fields[2]); bean.set(account, income, expenses); context.write(bean, NullWritable.get()); } } public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{ private Text k = new Text(); @Override protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context) throws IOException, InterruptedException { String account = bean.getAccount(); k.set(account); context.write(k, bean); } } }
例項二、倒排索引,過程如下:
Map階段
<0,"hello tom">
....
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner階段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer階段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello "a.txt->5 b.txt->3"
tom "a.txt->2 b.txt->1"
kitty "a.txt->1"
.......
程式碼如下:
public class InverseIndex {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//設定jar
job.setJarByClass(InverseIndex.class);
//設定Mapper相關的屬性
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
//設定Reducer相關屬性
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setCombinerClass(IndexCombiner.class);
//提交任務
job.waitForCompletion(true);
}
public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");
FileSplit inputSplit = (FileSplit) context.getInputSplit();
Path path = inputSplit.getPath();
String name = path.getName();
for(String f : fields){
k.set(f + "->" + name);
v.set("1");
context.write(k, v);
}
}
}
public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
private Text k = new Text();
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] fields = key.toString().split("->");
long sum = 0;
for(Text t : values){
sum += Long.parseLong(t.toString());
}
k.set(fields[0]);
v.set(fields[1] + "->" + sum);
context.write(k, v);
}
}
public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
private Text v = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String value = "";
for(Text t : values){
value += t.toString() + " ";
}
v.set(value);
context.write(key, v);
}
}
}
例項三、使用Partitioner使相同或者相似的資料傳遞到相同的reduce:
資料格式如下,分別代表手機號,上行流量,下行流量:
程式碼如下:
public class DataBean implements Writable {
private String telNo;
private long upPayLoad;
private long downPayLoad;
private long totalPayLoad;
public DataBean(){}
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
super();
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public String toString() {
return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
}
/**
* 序列化
* 注意:1.型別 2.順序
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
/**
* 反序列化
*/
public void readFields(DataInput in) throws IOException {
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
public class DataCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCount.class);
job.setMapperClass(DCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(DCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//設定reduce預設的Partitioner
job.setPartitionerClass(ServiceProviderPartitioner.class);
//此處需要設定reduce的數量
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.waitForCompletion(true);
}
public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//接收資料
String line = value.toString();
//分割資料
String[] fields = line.split("\t");
//獲取有效欄位,封裝到物件裡面
//手機號
String telNo = fields[0];
//上行流量
long up = Long.parseLong(fields[1]);
//下行流量
long down = Long.parseLong(fields[2]);
//封裝資料,new DataBean
DataBean bean = new DataBean(telNo, up, down);
//輸出
context.write(new Text(telNo), bean);
}
}
public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
@Override
protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
throws IOException, InterruptedException {
//定義計數器
long up_sum = 0;
long down_sum = 0;
//迭代v2s,進行求和
for(DataBean bean : v2s){
up_sum += bean.getUpPayLoad();
down_sum += bean.getDownPayLoad();
}
//封裝資料
DataBean bean = new DataBean("", up_sum, down_sum);
//輸出
context.write(key, bean);
}
}
public static class ServiceProviderPartitioner extends Partitioner<Text, DataBean>{
private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
static {
providerMap.put("139", 1);
providerMap.put("138", 2);
providerMap.put("159", 3);
}
@Override
public int getPartition(Text key, DataBean value, int number) {
String telNo = key.toString();
String pcode = telNo.substring(0, 3);
Integer p = providerMap.get(pcode);
if(p == null){
p = 0;
}
return p;
}
}
}
例項四、實現以下簡單演算法,其中mr程式涉及到了reduce分組等概念:
#當第一列相同時,求出第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
----------結果---------
3 1
2 1
1 1
public class GroupApp {
static final String INPUT_PATH = "hdfs://xxx:9000/input";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception{
final Configuration configuration = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
if(fileSystem.exists(new Path(OUT_PATH))){
fileSystem.delete(new Path(OUT_PATH), true);
}
final Job job = Job.getInstance(configuration, GroupApp.class.getSimpleName());
//1.1 指定輸入檔案路徑
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定哪個類用來格式化輸入檔案
job.setInputFormatClass(TextInputFormat.class);
//1.2指定自定義的Mapper類
job.setMapperClass(MyMapper.class);
//指定輸出<k2,v2>的型別
job.setMapOutputKeyClass(NewK2.class);
job.setMapOutputValueClass(LongWritable.class);
//1.3 指定分割槽類
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 TODO 排序、分割槽
job.setGroupingComparatorClass(MyGroupingComparator.class);
//1.5 TODO (可選)合併
//2.2 指定自定義的reduce類
job.setReducerClass(MyReducer.class);
//指定輸出<k3,v3>的型別
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定輸出到哪裡
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
//設定輸出檔案的格式化類
job.setOutputFormatClass(TextOutputFormat.class);
//把程式碼提交給JobTracker執行
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>. throws java.io.IOException ,InterruptedException {
final String[] splited = value.toString().split("\t");
final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
context.write(k2, v2);
};
}
static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
long min = Long.MAX_VALUE;
for (LongWritable v2 : v2s) {
if(v2.get()<min){
min = v2.get();
}
}
context.write(new LongWritable(k2.first), new LongWritable(min));
};
}
/**
* 問:為什麼實現該類?
* 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2
*
*/
static class NewK2 implements WritableComparable<NewK2>{
Long first;
Long second;
public NewK2(){}
public NewK2(long first, long second){
this.first = first;
this.second = second;
}
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}
/**
* 當k2進行排序時,會呼叫該方法.
* 當第一列不同時,升序;當第一列相同時,第二列升序
*/
public int compareTo(NewK2 o) {
final long minus = this.first - o.first;
if(minus !=0){
return (int)minus;
}
return (int)(this.second - o.second);
}
@Override
public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}
@Override
public boolean equals(Object obj) {
if(!(obj instanceof NewK2)){
return false;
}
NewK2 oK2 = (NewK2)obj;
return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
/**
* 問:為什麼自定義該類?
* 答:業務要求分組是按照第一列分組,但是NewK2的比較規則決定了不能按照第一列分。只能自定義分組比較器。
*/
static class MyGroupingComparator implements RawComparator<NewK2>{
public int compare(NewK2 o1, NewK2 o2) {
return (int)(o1.first - o2.first);
}
/**
* @param arg0 表示第一個參與比較的位元組陣列
* @param arg1 表示第一個參與比較的位元組陣列的起始位置
* @param arg2 表示第一個參與比較的位元組陣列的偏移量
*
* @param arg3 表示第二個參與比較的位元組陣列
* @param arg4 表示第二個參與比較的位元組陣列的起始位置
* @param arg5 表示第二個參與比較的位元組陣列的偏移量
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
}
}
}
例項五:利用MapReduce求解海量資料檔案中的最大值,利用Mapper類中的cleanup()函式,因為cleanup()函式是在所有的map()完成之後才執行的。
public class TopKApp {
static final String INPUT_PATH = "hdfs://xxx:9000/input";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
long max = Long.MIN_VALUE;
protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
final long temp = Long.parseLong(v1.toString());
if(temp>max){
max = temp;
}
};
protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
context.write(new LongWritable(max), NullWritable.get());
};
}
static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
long max = Long.MIN_VALUE;
protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
final long temp = k2.get();
if(temp>max){
max = temp;
}
};
protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
context.write(new LongWritable(max), NullWritable.get());
};
}
}
實力六、計數器:
public class WordCountApp {
static final String INPUT_PATH = "hdfs://xxx:9000/hello";
static final String OUT_PATH = "hdfs://xxx:9000/out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
final Path outPath = new Path(OUT_PATH);
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}
final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
//1.1指定讀取的檔案位於哪裡
FileInputFormat.setInputPaths(job, INPUT_PATH);
//指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
//job.setInputFormatClass(TextInputFormat.class);
//1.2 指定自定義的map類
job.setMapperClass(MyMapper.class);
//map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(LongWritable.class);
//1.3 分割槽
//job.setPartitionerClass(HashPartitioner.class);
//有一個reduce任務執行
//job.setNumReduceTasks(1);
//1.4 TODO 排序、分組
//1.5 TODO 規約
//2.2 指定自定義reduce類
job.setReducerClass(MyReducer.class);
//指定reduce的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定寫出到哪裡
FileOutputFormat.setOutputPath(job, outPath);
//指定輸出檔案的格式化類
//job.setOutputFormatClass(TextOutputFormat.class);
//把job提交給JobTracker執行
job.waitForCompletion(true);
}
/**
* KEYIN 即k1 表示行的偏移量
* VALUEIN 即v1 表示行文字內容
* KEYOUT 即k2 表示行中出現的單詞
* VALUEOUT 即v2 表示行中出現的單詞的次數,固定值1
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
final String line = v1.toString();
if(line.contains("hello")){
//記錄敏感詞出現在一行中
helloCounter.increment(1L);
}
final String[] splited = line.split("\t");
for (String word : splited) {
context.write(new Text(word), new LongWritable(1));
}
};
}
/**
* KEYIN 即k2 表示行中出現的單詞
* VALUEIN 即v2 表示行中出現的單詞的次數
* KEYOUT 即k3 表示文字中出現的不同單詞
* VALUEOUT 即v3 表示文字中出現的不同單詞的總次數
*
*/
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
long times = 0L;
for (LongWritable count : v2s) {
times += count.get();
}
ctx.write(k2, new LongWritable(times));
};
}
}