MapReduce練習案例3 - 自定義分割槽
阿新 • • 發佈:2021-01-27
技術標籤:大資料MapReduceHadoop大資料hadoopmapreduce
更多大資料專欄文章請點選 : –> 小馬哥大資料專欄博文導航 <–
案例3: 手機號碼分割槽
3.1 需求
在案例一的基礎上,繼續完善,將不同的手機號分到不同的資料檔案的當中去,需要自定義分割槽來實現,這裡我們自定義來模擬分割槽,將以下數字開頭的手機號進行分開
135 開頭資料到一個分割槽檔案
136 開頭資料到一個分割槽檔案
137 開頭資料到一個分割槽檔案
其他分割槽
3.2 思路
1, 在案例1的基礎上, 新增分割槽;
2, 分割槽實現: 使用MapReduce的自定義分割槽技術, 實現Partitioner邏輯;
3, 在Job啟動類中設定job的分割槽類.
3.3 程式碼
案例1中程式碼基礎上(程式碼略, 請翻看前面博文), 新增自定義Paritioner類
Partitioner類
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PhoneNumPartitioner extends Partitioner<Text, Text> {
/**
* 分割槽邏輯
* 135 開頭資料到一個分割槽檔案
* 136 開頭資料到一個分割槽檔案
* 137 開頭資料到一個分割槽檔案
* 其他分割槽
*
* @param key : Mapper任務的輸出key
* @param value: : Mapper任務的輸出value
* @param numPartitions: 分幾個區, 就需要有幾個ReduceTask, 這裡設定分割槽個數
* @return
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
String phoneNum = key.toString();
if (phoneNum.startsWith("135")) {
return 0;
} else if (phoneNum.startsWith("136")) {
return 1;
} else if ( phoneNum.startsWith("137")) {
return 2;
} else {
return 3;
}
}
}
Job啟動類
關鍵兩行程式碼: 星號位置
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MainJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//1,建立一個Job類
Job job = Job.getInstance(super.getConf(), "Example1_job");
//2, 設定輸入類,輸入路徑
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1"));
//3, 設定Mapper類, map輸出型別
job.setMapperClass(Example1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//** 設定分割槽類 **
job.setPartitionerClass(PhoneNumPartitioner.class);
//4, 設定Reducer類, reduce輸出型別
job.setReducerClass(Example1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
//** 設定ReduceTask的個數 **
job.setNumReduceTasks(4);
//5, 設定輸出類, 輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D:\\devDoc\\hadoop\\datas\\example1_partitioner_result"));
//6, 啟動Job, 等待Job執行
boolean completion = job.waitForCompletion(true);
return completion?1:0;
}
public static void main(String[] args) {
int run = 0;
try {
run = ToolRunner.run(new Configuration(), new MainJob(), args);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(run);
}
}
輸出結果
計數器顯示
[main] INFO org.apache.hadoop.mapreduce.Job - Counters: 30
File System Counters
FILE: Number of bytes read=45014
FILE: Number of bytes written=2461514
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=23
Map output records=23
Map output bytes=2830
Map output materialized bytes=2900
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=2900
Reduce input records=23
Reduce output records=21
Spilled Records=46
Shuffled Maps =4
Failed Shuffles=0
Merged Map outputs=4
GC time elapsed (ms)=9
Total committed heap usage (bytes)=956825600
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2583
File Output Format Counters
Bytes Written=604
Process finished with exit code 1
結果檔案顯示
2021/01/26 17:58 59 part-r-00000
2021/01/26 17:58 85 part-r-00001
2021/01/26 17:58 75 part-r-00002
2021/01/26 17:58 337 part-r-00003
2021/01/26 17:58 0 _SUCCESS
開啟其中一個檔案
13480253104 3 3 180 180
13823070001 6 3 360 180
13826544101 4 0 264 0
13922314466 12 12 3008 3720
13925057413 69 63 11058 48243
13926251106 4 0 240 0
......
更多大資料專欄文章請點選 : –> 小馬哥大資料專欄博文導航 <–