1. 程式人生 > >hadoop之MR位元組碼轉換

hadoop之MR位元組碼轉換

大家如果看過hadoop的文字檔案輸入字符集格式,就知道在TextOutputFormat原始碼中寫死了輸出位元組碼格式是UTF-8,原始碼如下

  
import java.io.DataOutputStream;  
import java.io.IOException;  
import java.io.UnsupportedEncodingException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.fs.FSDataOutputStream;  
  
import org.apache.hadoop.io.NullWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.compress.CompressionCodec;  
import org.apache.hadoop.io.compress.GzipCodec;  
import org.apache.hadoop.mapreduce.OutputFormat;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.util.*;  
  
/** An {@link OutputFormat} that writes plain text files. */  
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是預設的輸出檔案格式   
  protected static class LineRecordWriter<K, V>//預設   
    extends RecordWriter<K, V> {  
    private static final String utf8 = "UTF-8"; //這個地方寫死了輸出字符集是UTF-8 
    private static final byte[] newline;//行結束符?   
    static {  
      try {  
        newline = "\n".getBytes(utf8);  
      } catch (UnsupportedEncodingException uee) {  
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
      }  
    }  
  
    protected DataOutputStream out;  
    private final byte[] keyValueSeparator;//key和value的分隔符,預設的好像是Tab   
  
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//建構函式,初始化輸出流及分隔符    
      this.out = out;  
      try {  
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
      } catch (UnsupportedEncodingException uee) {  
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
      }  
    }  
  
    public LineRecordWriter(DataOutputStream out) {//預設的分隔符   
      this(out, "\t");  
    }  
  
    /** 
     * Write the object to the byte stream, handling Text as a special輸出流是byte格式的 
     * case. 
     * @param o the object to print是要輸出的物件 
     * @throws IOException if the write throws, we pass it on 
     */  
    private void writeObject(Object o) throws IOException {//應該是一行一行的寫 key keyValueSeparator value \n   
      if (o instanceof Text) {//如果o是Text的例項   
        Text to = (Text) o;  
        out.write(to.getBytes(), 0, to.getLength());//寫出   
      } else {  
        out.write(o.toString().getBytes(utf8));  
      }  
    }  
  
    public synchronized void write(K key, V value)//給寫執行緒加鎖,寫是互斥行為   
      throws IOException {  
<span style="white-space:pre">    </span>//下面是為了判斷key和value是否為空值   
      boolean nullKey = key == null || key instanceof NullWritable;//這語句太牛了   
      boolean nullValue = value == null || value instanceof NullWritable;  
      if (nullKey && nullValue) {//   
        return;  
      }  
      if (!nullKey) {  
        writeObject(key);  
      }  
      if (!(nullKey || nullValue)) {  
        out.write(keyValueSeparator);  
      }  
      if (!nullValue) {  
        writeObject(value);  
      }  
      out.write(newline);  
    }  
  
    public synchronized   
    void close(TaskAttemptContext context) throws IOException {  
      out.close();  
    }  
  }  
  
  public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//獲得writer例項   
                         ) throws IOException, InterruptedException {  
    Configuration conf = job.getConfiguration();  
    boolean isCompressed = getCompressOutput(job);//   
    String keyValueSeparator= conf.get("mapred.textoutputformat.separator",  
                                       "\t");  
    CompressionCodec codec = null;//壓縮格式 還是?   
    String extension = "";  
    if (isCompressed) {  
      Class<? extends CompressionCodec> codecClass =   
        getOutputCompressorClass(job, GzipCodec.class);  
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
      extension = codec.getDefaultExtension();  
    }  
    Path file = getDefaultWorkFile(job, extension);//這個是獲取預設的檔案路徑及名稱,在FileOutput中有對其的實現   
    FileSystem fs = file.getFileSystem(conf);  
    if (!isCompressed) {  
      FSDataOutputStream fileOut = fs.create(file, false);  
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
    } else {  
      FSDataOutputStream fileOut = fs.create(file, false);  
      return new LineRecordWriter<K, V>(new DataOutputStream  
                                        (codec.createOutputStream(fileOut)),  
                                        keyValueSeparator);  
    }  
  }  
}  
但是在生產環境中,輸入輸出字符集格式總是不一定會是utf-8格式,有可能處理之後的文字要求輸出格式是GBK、BIG5等之類,作為下一個程式的輸入格式,尤其是銀行業,日誌格式一般都是GBK,指定輸出格式是GBK替換上面的原始碼中的UTF-8即可,但是字符集編碼格式那麼多,做一個大資料的平臺產品,面向的就是全世界的客戶,這樣去指定輸出格式沒有那麼自動化,受眾也是極窄的。如果是我們能夠在MR程式的設定指定字符集那就完美契合生產環境中複雜的需求。故而修改原始碼如下:
package com.huateng.hadoop.mapred.transcoding.format;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class EncodingOutputFormat<K, V> extends FileOutputFormat<K, V>{
	  
	  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
	  protected static class LineRecordWriter<K, V>
	    extends RecordWriter<K, V>  {
	    private   String charset;//手動指定的引數,在這裡我們可以任意去指定一個輸出的字符集
	    private   byte[] newline;
	   
	    protected DataOutputStream out;
	    private final byte[] keyValueSeparator;
//傳入的引數,在下面寫進輸出行記錄,那麼指定的字符集引數從哪裡傳進去了?必須在MR執行的時候能夠起作用。所以我們就必須在構造器中賦予該引數
	    public LineRecordWriter(DataOutputStream out, String keyValueSeparator,String dsc_charset) {
	      this.out = out;
	      charset=dsc_charset; 
	      try {
	    	  	newline = "\n".getBytes(charset);
	    	  	this.keyValueSeparator = keyValueSeparator.getBytes(charset);
	      } catch (UnsupportedEncodingException uee) {
	    	  	throw new IllegalArgumentException("can't find " + charset + " encoding");
	      }
	    }
	    /**
	     * Write the object to the byte stream, handling Text as a special case.
	     * @param o the object to print
	     * @throws IOException if the write throws, we pass it on
	     */
	    private void writeObject(Object o) throws IOException {
	      if (o instanceof Text) {
//	        	Text to = (Text) o;
//	        	out.write(to.getBytes(), 0, to.getLength());
//	      } else {
	    	  	out.write(o.toString().getBytes(charset));
	      }
	    }

	    public synchronized void write(K key, V value)
	    		throws IOException {

	      boolean nullKey = key == null || key instanceof NullWritable;
	      boolean nullValue = value == null || value instanceof NullWritable;
	      if (nullKey && nullValue) {
	        return;
	      }
	      if (!nullKey) {
	        writeObject(key);
	      }
	      if (!(nullKey || nullValue)) {
	        out.write(keyValueSeparator);
	      }
	      if (!nullValue) {
	        writeObject(value);
	      }
	      out.write("\n".getBytes());
	    }

	    public synchronized 
	    		void close(TaskAttemptContext context) throws IOException {
	      out.close();
	    }
	  }

	  public RecordWriter<K, V> 
	         getRecordWriter(
	        		 TaskAttemptContext job
	                         ) throws IOException, InterruptedException {
	    Configuration conf = job.getConfiguration();
	    String dst_charset = job.getConfiguration().get("ark.dsccodec");
//	    compress壓縮
//引數就是從這裡傳進來的,我們得到MR的job的引數,進行指定
	    boolean isCompressed = getCompressOutput(job);
	    String keyValueSeparator= conf.get(SEPERATOR, "\t");
	    CompressionCodec codec = null;
	    String extension = "";
	    if (isCompressed) {
	      Class<? extends CompressionCodec> codecClass = 
	        getOutputCompressorClass(job, GzipCodec.class);
	      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
	      extension = codec.getDefaultExtension();
	    }
//	    setOutputName(job,"transform");
	    Path file = getDefaultWorkFile(job, extension);
	    FileSystem fs = file.getFileSystem(conf);
	    if (!isCompressed) {
	      FSDataOutputStream fileOut = fs.create(file, false);
	      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator,dst_charset);
	    } else {
	      FSDataOutputStream fileOut = fs.create(file, false);
	      return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
	                                				keyValueSeparator,dst_charset);
	    }
	  }
	}
 private   String charset;//手動指定的引數,在這裡我們可以任意去指定一個輸出的字符集
/傳入的引數,在下面寫進輸出行記錄,那麼指定的字符集引數從哪裡傳進去了?必須在MR執行的時候能夠起作用。所以我們就必須在構造器中賦予該引數
	    public LineRecordWriter(DataOutputStream out, String keyValueSeparator,String dsc_charset) {}
<pre name="code" class="html">protected static class LineRecordWriter<K, V>
	    extends RecordWriter<K, V>  
在LineRecordWriter的父類RecordWriter中得到指定的字符集
<pre name="code" class="html">public RecordWriter<K, V> 
	         getRecordWriter(
	        		 TaskAttemptContext job
	                         ) throws IOException, InterruptedException {
	    Configuration conf = job.getConfiguration();
	    String dst_charset = job.getConfiguration().get("ark.dsccodec");
//引數就是從這裡傳進來的,我們得到MR的job的引數,進行指定
}
<pre name="code" class="html">job.getConfiguration().get("ark.dsccodec")這個設定Configuration則來源於建立Job例項的進行指定,至此完成。本人生產環境(華為FI叢集)親測,JDK支援的字符集都可
轉換。具體main方法設定引數如下:
<pre name="code" class="html">
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.google.common.base.Preconditions;
import com.huateng.hadoop.mapred.MapRedAdapter;
import com.huateng.hadoop.mapred.transcoding.format.EncodingOutputFormat;
//import com.huateng.hadoop.mapred.transcoding.format.GB2312OutputFormat;
//import com.huateng.hadoop.mapred.transcoding.format.GBKOutputFormat;
import com.huateng.hdfs.common.HDFSClient;
import com.huateng.util.common.StringUtils;

/*
 * @author canMao
 */
public class TranscodingJob 
{
	String other_code=null;
	private Job internalJob;
	public TranscodingJob(String in_path,String src_charset,
			String out_path,String dst_charset)throws Exception{
		Preconditions.checkArgument(!StringUtils.hasNullOrEmpty(new String[]{src_charset, dst_charset})
				," source_encoding and destination_encoding is null at least one");
		Job job = MapRedAdapter.createJob();
		job.getConfiguration().set("ark.codec", src_charset);
		job.getConfiguration().set("ark.dsccodec", dst_charset);
		job.setJarByClass(TranscodingJob.class);
		job.setMapperClass(TranscodingMapper.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		if (dst_charset.equals("UTF-8")) {
			job.setOutputFormatClass(TextOutputFormat.class);
		}else{
			job.setOutputFormatClass(EncodingOutputFormat.class);
		}
		FileInputFormat.setInputPaths(job, new Path(in_path));
		if (HDFSClient.getFileSystem().exists(new Path(out_path))) {
			HDFSClient.getFileSystem().delete(new Path(out_path),true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out_path));
		internalJob = job;
	}
	
	public boolean submit() throws ClassNotFoundException, IOException, InterruptedException
	{
//		float progress=0.0f;
//		internalJob.submit();
//		while(true){
//			internalJob.mapProgress();
//		}
		
		return internalJob.waitForCompletion(true);
//		internalJob.submit();
	}
	
	
}