hadoop之MR位元組碼轉換
阿新 • • 發佈:2019-02-04
大家如果看過hadoop的文字檔案輸入字符集格式,就知道在TextOutputFormat原始碼中寫死了輸出位元組碼格式是UTF-8,原始碼如下
但是在生產環境中,輸入輸出字符集格式總是不一定會是utf-8格式,有可能處理之後的文字要求輸出格式是GBK、BIG5等之類,作為下一個程式的輸入格式,尤其是銀行業,日誌格式一般都是GBK,指定輸出格式是GBK替換上面的原始碼中的UTF-8即可,但是字符集編碼格式那麼多,做一個大資料的平臺產品,面向的就是全世界的客戶,這樣去指定輸出格式沒有那麼自動化,受眾也是極窄的。如果是我們能夠在MR程式的設定指定字符集那就完美契合生產環境中複雜的需求。故而修改原始碼如下:import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes plain text files. */ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是預設的輸出檔案格式 protected static class LineRecordWriter<K, V>//預設 extends RecordWriter<K, V> { private static final String utf8 = "UTF-8"; //這個地方寫死了輸出字符集是UTF-8 private static final byte[] newline;//行結束符? static { try { newline = "\n".getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } protected DataOutputStream out; private final byte[] keyValueSeparator;//key和value的分隔符,預設的好像是Tab public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//建構函式,初始化輸出流及分隔符 this.out = out; try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + utf8 + " encoding"); } } public LineRecordWriter(DataOutputStream out) {//預設的分隔符 this(out, "\t"); } /** * Write the object to the byte stream, handling Text as a special輸出流是byte格式的 * case. * @param o the object to print是要輸出的物件 * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException {//應該是一行一行的寫 key keyValueSeparator value \n if (o instanceof Text) {//如果o是Text的例項 Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength());//寫出 } else { out.write(o.toString().getBytes(utf8)); } } public synchronized void write(K key, V value)//給寫執行緒加鎖,寫是互斥行為 throws IOException { <span style="white-space:pre"> </span>//下面是為了判斷key和value是否為空值 boolean nullKey = key == null || key instanceof NullWritable;//這語句太牛了 boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) {// return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write(newline); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job//獲得writer例項 ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job);// String keyValueSeparator= conf.get("mapred.textoutputformat.separator", "\t"); CompressionCodec codec = null;//壓縮格式 還是? String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } Path file = getDefaultWorkFile(job, extension);//這個是獲取預設的檔案路徑及名稱,在FileOutput中有對其的實現 FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); } } }
package com.huateng.hadoop.mapred.transcoding.format; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class EncodingOutputFormat<K, V> extends FileOutputFormat<K, V>{ public static String SEPERATOR = "mapreduce.output.textoutputformat.separator"; protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> { private String charset;//手動指定的引數,在這裡我們可以任意去指定一個輸出的字符集 private byte[] newline; protected DataOutputStream out; private final byte[] keyValueSeparator; //傳入的引數,在下面寫進輸出行記錄,那麼指定的字符集引數從哪裡傳進去了?必須在MR執行的時候能夠起作用。所以我們就必須在構造器中賦予該引數 public LineRecordWriter(DataOutputStream out, String keyValueSeparator,String dsc_charset) { this.out = out; charset=dsc_charset; try { newline = "\n".getBytes(charset); this.keyValueSeparator = keyValueSeparator.getBytes(charset); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + charset + " encoding"); } } /** * Write the object to the byte stream, handling Text as a special case. * @param o the object to print * @throws IOException if the write throws, we pass it on */ private void writeObject(Object o) throws IOException { if (o instanceof Text) { // Text to = (Text) o; // out.write(to.getBytes(), 0, to.getLength()); // } else { out.write(o.toString().getBytes(charset)); } } public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (nullKey && nullValue) { return; } if (!nullKey) { writeObject(key); } if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } if (!nullValue) { writeObject(value); } out.write("\n".getBytes()); } public synchronized void close(TaskAttemptContext context) throws IOException { out.close(); } } public RecordWriter<K, V> getRecordWriter( TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); String dst_charset = job.getConfiguration().get("ark.dsccodec"); // compress壓縮
//引數就是從這裡傳進來的,我們得到MR的job的引數,進行指定 boolean isCompressed = getCompressOutput(job); String keyValueSeparator= conf.get(SEPERATOR, "\t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } // setOutputName(job,"transform"); Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator,dst_charset); } else { FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator,dst_charset); } } }
private String charset;//手動指定的引數,在這裡我們可以任意去指定一個輸出的字符集
/傳入的引數,在下面寫進輸出行記錄,那麼指定的字符集引數從哪裡傳進去了?必須在MR執行的時候能夠起作用。所以我們就必須在構造器中賦予該引數
public LineRecordWriter(DataOutputStream out, String keyValueSeparator,String dsc_charset) {}
<pre name="code" class="html">protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V>
在LineRecordWriter的父類RecordWriter中得到指定的字符集
<pre name="code" class="html">public RecordWriter<K, V>
getRecordWriter(
TaskAttemptContext job
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String dst_charset = job.getConfiguration().get("ark.dsccodec");
//引數就是從這裡傳進來的,我們得到MR的job的引數,進行指定
}
<pre name="code" class="html">job.getConfiguration().get("ark.dsccodec")這個設定Configuration則來源於建立Job例項的進行指定,至此完成。本人生產環境(華為FI叢集)親測,JDK支援的字符集都可
轉換。具體main方法設定引數如下:
<pre name="code" class="html">
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import com.google.common.base.Preconditions;
import com.huateng.hadoop.mapred.MapRedAdapter;
import com.huateng.hadoop.mapred.transcoding.format.EncodingOutputFormat;
//import com.huateng.hadoop.mapred.transcoding.format.GB2312OutputFormat;
//import com.huateng.hadoop.mapred.transcoding.format.GBKOutputFormat;
import com.huateng.hdfs.common.HDFSClient;
import com.huateng.util.common.StringUtils;
/*
* @author canMao
*/
public class TranscodingJob
{
String other_code=null;
private Job internalJob;
public TranscodingJob(String in_path,String src_charset,
String out_path,String dst_charset)throws Exception{
Preconditions.checkArgument(!StringUtils.hasNullOrEmpty(new String[]{src_charset, dst_charset})
," source_encoding and destination_encoding is null at least one");
Job job = MapRedAdapter.createJob();
job.getConfiguration().set("ark.codec", src_charset);
job.getConfiguration().set("ark.dsccodec", dst_charset);
job.setJarByClass(TranscodingJob.class);
job.setMapperClass(TranscodingMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
if (dst_charset.equals("UTF-8")) {
job.setOutputFormatClass(TextOutputFormat.class);
}else{
job.setOutputFormatClass(EncodingOutputFormat.class);
}
FileInputFormat.setInputPaths(job, new Path(in_path));
if (HDFSClient.getFileSystem().exists(new Path(out_path))) {
HDFSClient.getFileSystem().delete(new Path(out_path),true);
}
FileOutputFormat.setOutputPath(job, new Path(out_path));
internalJob = job;
}
public boolean submit() throws ClassNotFoundException, IOException, InterruptedException
{
// float progress=0.0f;
// internalJob.submit();
// while(true){
// internalJob.mapProgress();
// }
return internalJob.waitForCompletion(true);
// internalJob.submit();
}
}