1. 程式人生 > >大數據學習之自定義輸入 13

大數據學習之自定義輸入 13

table 輸入 配置信息 tst float 分享 ado 自定義 throws

一:自定義輸出

需求:將多個小文件合並為SequenceFile(存儲了多個小文件)

存儲格式:文件路徑+文件的內容

c:/a.txt i am hunter henshuai

c:/b.txt i love delireba

inputFormat(自定義加上路徑)

代碼編寫:

1:自定義FileInputFormat編寫

package it.dawn.YARNPra.自定義.inputformate;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * @author Dawn
 * @date 2019年5月9日22:58:19
 * @version 1.0
 * 自定義輸入,自己編寫框架
 * 需求?
 * 將多個小文件合並為SequenceFile(存儲了多個小文件)
 * 	存儲格式:文件路徑+文件的內容
 * 	c:/a.txt i am hunter henshuai 
 * 	c:/b.txt i love delireba
 * 
 * 	inputFormat(自定義加上路徑)
 */


//1.創建自定義inputformat 
//為什麽是用NullWritable, BytesWritable,
//因為,這裏的key我們暫時處理為空。到後面Map輸出階段的時候,我們再講輸出類型改成Text 和BytesWritable
public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{


	@Override
	protected boolean isSplitable(JobContext context,Path filename) {
		//不切原來的文件
		return false;
	}
	
	@Override
	public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		
		FuncRecordReader RecordReader=new FuncRecordReader();
		return RecordReader;
	}

}

  

2:自定義RecordReader類編寫

package it.dawn.YARNPra.自定義.inputformate;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * @author Dawn
 * @date 2019年5月9日23:12:03
 * @version 1.0
 * 
 */
//2.編寫RecordReader 
public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable>{

	boolean isProcess = false;
	FileSplit split;
	Configuration conf;
	BytesWritable value = new BytesWritable();
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)  {
		//初始化切片
		this.split=(FileSplit) split;
		//初始化配置信息
		conf=context.getConfiguration();
		
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if(!isProcess) {
			//1.根據切片的長度來創建緩沖區
			byte[] buf= new byte[(int)split.getLength()];
			FSDataInputStream fis = null;
			FileSystem fs = null;
			
			try {
				//2.獲取路徑
				Path path=split.getPath();
				
				//3.根據路徑獲取文件系統
				fs=path.getFileSystem(conf);
				
				//4:拿到輸入流
				fis=fs.open(path);
				
				//5:數據拷貝
				IOUtils.readFully(fis, buf, 0, buf.length);
				
				//6.拷貝緩存到最終的輸出
				value.set(buf, 0, buf.length);
			}catch (IOException e) {
				e.printStackTrace();
			}finally {
				IOUtils.closeStream(fis);
				IOUtils.closeStream(fs);
			}
			
			isProcess=true;
			
			return true;
		}
		return false;
	}

	@Override
	public NullWritable getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return NullWritable.get();
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return 0;
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		
	}

}

  

3:編寫MR

map:

package it.dawn.YARNPra.自定義.inputformate;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * @author Dawn
 * @date 2019年5月9日23:25:29
 * @version 1.0
 * 
 */
public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{

	Text k=new Text();
	
	@Override
	protected void setup(Context context)
			throws IOException, InterruptedException {
		//1拿到切片信息
		FileSplit split=(FileSplit) context.getInputSplit();
		
		//2路徑
		Path path=split.getPath();
		
		//3.即帶路徑又帶名稱
		k.set(path.toString());
	}
	
	
	@Override
	protected void map(NullWritable key, BytesWritable value,Context context)
			throws IOException, InterruptedException {
		context.write(k, value);
	}


}

  

Reducer:

package it.dawn.YARNPra.自定義.inputformate;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * @author Dawn
 * @date 2019年5月9日23:12:03
 * @version 1.0
 * 
 */
//2.編寫RecordReader 
public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable>{

	boolean isProcess = false;
	FileSplit split;
	Configuration conf;
	BytesWritable value = new BytesWritable();
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)  {
		//初始化切片
		this.split=(FileSplit) split;
		//初始化配置信息
		conf=context.getConfiguration();
		
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if(!isProcess) {
			//1.根據切片的長度來創建緩沖區
			byte[] buf= new byte[(int)split.getLength()];
			FSDataInputStream fis = null;
			FileSystem fs = null;
			
			try {
				//2.獲取路徑
				Path path=split.getPath();
				
				//3.根據路徑獲取文件系統
				fs=path.getFileSystem(conf);
				
				//4:拿到輸入流
				fis=fs.open(path);
				
				//5:數據拷貝
				IOUtils.readFully(fis, buf, 0, buf.length);
				
				//6.拷貝緩存到最終的輸出
				value.set(buf, 0, buf.length);
			}catch (IOException e) {
				e.printStackTrace();
			}finally {
				IOUtils.closeStream(fis);
				IOUtils.closeStream(fs);
			}
			
			isProcess=true;
			
			return true;
		}
		return false;
	}

	@Override
	public NullWritable getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return NullWritable.get();
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return 0;
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		
	}

}

  

driver:

package it.dawn.YARNPra.自定義.inputformate;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/**
 * @author Dawn
 * @date 2019年5月9日23:32:39
 * @version 1.0
 * 
 */
public class SequenceDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1.獲取job信息
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		// 2.獲取jar包
		job.setJarByClass(SequenceDriver.class);

		// 3.獲取自定義的mapper與reducer類
		job.setMapperClass(SequenceFileMapper.class);
		job.setReducerClass(SequenceFileReducer.class);
		
		//設置自定義讀取方式
		job.setInputFormatClass(FuncFileInputFormat.class);
		//設置默認的輸出方式
		job.setOutputFormatClass(SequenceFileOutputFormat.class);

		// 4.設置map輸出的數據類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);

		// 5.設置reduce輸出的數據類型(最終的數據類型)
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);

		// 6.設置輸入存在的路徑與處理後的結果路徑
		FileInputFormat.setInputPaths(job, new Path("f:/temp/inputSelf/*.txt"));
		FileOutputFormat.setOutputPath(job, new Path("f:/temp/inputSelfout1"));

		// 7.提交任務
		boolean rs = job.waitForCompletion(true);
		System.out.println(rs ? 0 : 1);

	}

}

  

運行結果截圖:

輸入:

技術分享圖片

輸出(將就看吧!輸出格式是BytesWriteble字節的輸出,看起來不是很好):

技術分享圖片

大數據學習之自定義輸入 13