大數據學習之自定義輸入 13
阿新 • • 發佈:2019-05-12
table 輸入 配置信息 tst float 分享 ado 自定義 throws
一:自定義輸出
需求:將多個小文件合並為SequenceFile(存儲了多個小文件)
存儲格式:文件路徑+文件的內容
c:/a.txt i am hunter henshuai
c:/b.txt i love delireba
inputFormat(自定義加上路徑)
代碼編寫:
1:自定義FileInputFormat編寫
package it.dawn.YARNPra.自定義.inputformate; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; /** * @author Dawn * @date 2019年5月9日22:58:19 * @version 1.0 * 自定義輸入,自己編寫框架 * 需求? * 將多個小文件合並為SequenceFile(存儲了多個小文件) * 存儲格式:文件路徑+文件的內容 * c:/a.txt i am hunter henshuai * c:/b.txt i love delireba * * inputFormat(自定義加上路徑) */ //1.創建自定義inputformat //為什麽是用NullWritable, BytesWritable, //因為,這裏的key我們暫時處理為空。到後面Map輸出階段的時候,我們再講輸出類型改成Text 和BytesWritable public class FuncFileInputFormat extends FileInputFormat<NullWritable, BytesWritable>{ @Override protected boolean isSplitable(JobContext context,Path filename) { //不切原來的文件 return false; } @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { FuncRecordReader RecordReader=new FuncRecordReader(); return RecordReader; } }
2:自定義RecordReader類編寫
package it.dawn.YARNPra.自定義.inputformate; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @author Dawn * @date 2019年5月9日23:12:03 * @version 1.0 * */ //2.編寫RecordReader public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable>{ boolean isProcess = false; FileSplit split; Configuration conf; BytesWritable value = new BytesWritable(); @Override public void initialize(InputSplit split, TaskAttemptContext context) { //初始化切片 this.split=(FileSplit) split; //初始化配置信息 conf=context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(!isProcess) { //1.根據切片的長度來創建緩沖區 byte[] buf= new byte[(int)split.getLength()]; FSDataInputStream fis = null; FileSystem fs = null; try { //2.獲取路徑 Path path=split.getPath(); //3.根據路徑獲取文件系統 fs=path.getFileSystem(conf); //4:拿到輸入流 fis=fs.open(path); //5:數據拷貝 IOUtils.readFully(fis, buf, 0, buf.length); //6.拷貝緩存到最終的輸出 value.set(buf, 0, buf.length); }catch (IOException e) { e.printStackTrace(); }finally { IOUtils.closeStream(fis); IOUtils.closeStream(fs); } isProcess=true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub } }
3:編寫MR
map:
package it.dawn.YARNPra.自定義.inputformate; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @author Dawn * @date 2019年5月9日23:25:29 * @version 1.0 * */ public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable>{ Text k=new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //1拿到切片信息 FileSplit split=(FileSplit) context.getInputSplit(); //2路徑 Path path=split.getPath(); //3.即帶路徑又帶名稱 k.set(path.toString()); } @Override protected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException { context.write(k, value); } }
Reducer:
package it.dawn.YARNPra.自定義.inputformate; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * @author Dawn * @date 2019年5月9日23:12:03 * @version 1.0 * */ //2.編寫RecordReader public class FuncRecordReader extends RecordReader<NullWritable, BytesWritable>{ boolean isProcess = false; FileSplit split; Configuration conf; BytesWritable value = new BytesWritable(); @Override public void initialize(InputSplit split, TaskAttemptContext context) { //初始化切片 this.split=(FileSplit) split; //初始化配置信息 conf=context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(!isProcess) { //1.根據切片的長度來創建緩沖區 byte[] buf= new byte[(int)split.getLength()]; FSDataInputStream fis = null; FileSystem fs = null; try { //2.獲取路徑 Path path=split.getPath(); //3.根據路徑獲取文件系統 fs=path.getFileSystem(conf); //4:拿到輸入流 fis=fs.open(path); //5:數據拷貝 IOUtils.readFully(fis, buf, 0, buf.length); //6.拷貝緩存到最終的輸出 value.set(buf, 0, buf.length); }catch (IOException e) { e.printStackTrace(); }finally { IOUtils.closeStream(fis); IOUtils.closeStream(fs); } isProcess=true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub } }
driver:
package it.dawn.YARNPra.自定義.inputformate; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; /** * @author Dawn * @date 2019年5月9日23:32:39 * @version 1.0 * */ public class SequenceDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1.獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.獲取jar包 job.setJarByClass(SequenceDriver.class); // 3.獲取自定義的mapper與reducer類 job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); //設置自定義讀取方式 job.setInputFormatClass(FuncFileInputFormat.class); //設置默認的輸出方式 job.setOutputFormatClass(SequenceFileOutputFormat.class); // 4.設置map輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 5.設置reduce輸出的數據類型(最終的數據類型) job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 6.設置輸入存在的路徑與處理後的結果路徑 FileInputFormat.setInputPaths(job, new Path("f:/temp/inputSelf/*.txt")); FileOutputFormat.setOutputPath(job, new Path("f:/temp/inputSelfout1")); // 7.提交任務 boolean rs = job.waitForCompletion(true); System.out.println(rs ? 0 : 1); } }
運行結果截圖:
輸入:
輸出(將就看吧!輸出格式是BytesWriteble字節的輸出,看起來不是很好):
大數據學習之自定義輸入 13