MapReduce之自定義OutputFormat
阿新 • • 發佈:2020-08-05
@[toc]
## OutputFormat介面實現類
`OutputFormat`是`MapReduce`輸出的基類,所有實現`MapReduce`輸出都實現了`OutputFormat`介面。下面介紹幾種常見的OutputFormat實現類。
- 文字輸出`TextoutputFormat`
預設的輸出格式是TextOutputFormat,==它把每條記錄寫為文字行==。它的鍵和值可以是任意型別,因為TextOutputFormat呼叫toString()方法把它們轉換為==字串==。
- `SequenceFileOutputFormat`
將SecquenceFileOutputFormat輸出==作為後續MapReduce任務的輸入==,這便是一種好的輸出格式,因為它的==格式緊湊,很容易被壓縮。==
- ==**自定義OutputFormat**==
根據使用者需求,自定義實現輸出。
## 自定義OutputFormat使用場景及步驟
### 使用場景
- 為了實現控制最終檔案的輸出路徑和輸出格式,可以自定義OutputFormat。
例如:要在一個MapReduce程式中根據資料的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義OutputFormat來實現。
- 自定義OutputFormat步驟
(1)自定義一個類繼承`FileOutputFormat`。
(2)改寫`RecordWriter`,具體改寫輸出資料的方法`write()`。
## 自定義OutputFormat 案例實操
**需求**
過濾輸入的log日誌,包含atguigu的網站輸出到==e:/atguigu.log==,不包含atguigu的網站輸出到==e:/other.log==。
**輸入資料**
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805165733702.png)
**==什麼時候需要Reduce==**
①合併
②需要對資料排序
==所以本案例不需要Reduce階段,key-value不需要實現序列化==
**CustomOFMapper.java**
```java
public class CustomOFMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String content = value.toString();
//value不需要,但是不能用Null這個關鍵字,要使用NullWritable物件
context.write(content+"\r\n", NullWritable.get());
}
}
```
**MyOutPutFormat.java**
```java
public class MyOutPutFormat extends FileOutputFormat{
@Override
public RecordWriter getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
return new MyRecordWriter(job);//傳遞job物件,才能在RecordWriter中獲取配置
}
}
```
**MyRecordWriter.java**
```java
public class MyRecordWriter extends RecordWriter {
private Path atguiguPath=new Path("e:/atguigu.log");
private Path otherPath=new Path("e:/other.log");
private FSDataOutputStream atguguOS ;
private FSDataOutputStream otherOS ;
private FileSystem fs;
private TaskAttemptContext context;
public MyRecordWriter(TaskAttemptContext job) throws IOException {
context=job;
Configuration conf = job.getConfiguration();
fs=FileSystem.get(conf);
atguiguOS = fs.create(atguiguPath);
otherOS = fs.create(otherPath);
}
// 將key-value寫出到檔案
@Override
public void write(String key, NullWritable value) throws IOException, InterruptedException {
if (key.contains("atguigu")) {
atguguOS.write(key.getBytes());//寫到atguigu.log
//統計輸出的含有atguigu字串的key-value個數
context.getCounter("MyCounter", "atguiguCounter").increment(1);
}else {
otherOS.write(key.getBytes());//寫到other.log
context.getCounter("MyCounter", "otherCounter").increment(1);
}
}
// 關閉流
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (atguguOS != null) {
IOUtils.closeStream(atguguOS);
}
if (otherOS != null) {
IOUtils.closeStream(otherOS);
}
if (fs != null) {
fs.close();
}
}
}
```
**CustomOFDriver.java**
```java
public class CustomOFDriver {
public static void main(String[] args) throws Exception {
Path inputPath=new Path("e:/mrinput/outputformat");
Path outputPath=new Path("e:/mroutput/outputformat");
//作為整個Job的配置
Configuration conf = new Configuration();
//保證輸出目錄不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
// ①建立Job
Job job = Job.getInstance(conf);
//重點,設定為自定義的輸出格式
job.setJarByClass(CustomOFDriver.class);
// ②設定Job
// 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別
job.setMapperClass(CustomOFMapper.class);
// 設定輸入目錄和輸出目錄
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 設定輸入和輸出格式
job.setOutputFormatClass(MyOutPutFormat.class);
// 取消reduce階段。設定為0,預設為1
job.setNumReduceTasks(0);
// ③執行Job
job.waitForCompletion(true);
}
}
```
輸出檔案:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805170615910.png)
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200805170629430.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFF