1. 程式人生 > >控制MapReduce輸出檔案個數及格式

控制MapReduce輸出檔案個數及格式

控制MapReduce多檔案輸出

預設情況下MapReduce任務結束後一個reduce產生一個輸出檔案,檔名類似part-xxxxx, 有時為了方便後續對這些檔案的處理,比如根據檔名import到不通的hive分割槽,我們需要控制reduce輸出產生的檔名,讓相同的reduce key寫入同一個檔案,此時可繼承MultipleOutputFormat過載generateFileNameForKeyValue定製OutputFormat。

public static class MyMultipleOutputFormat extends MultipleOutputFormat<K, V>{

   protected String generateFileNameForKeyValue(K key, V value) {
        return String.format("%s_%s", key.getDt(), key.getHour());
   }

   abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs,
      JobConf job, String path, Progressable arg3) throws IOException;
}

這樣同一天相同小時的日誌就被聚集到同一個檔案,很容易匯入到對應的小時分割槽。

控制MapReduce輸出格式

reduce輸出檔案格式不同,儲存和讀取效率都會有差別,可以過載getBaseRecordWriter返回不同格式的Writer.

SequenceFileWriter

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;

public class ValSequenceFileWriter<K, V> extends RecordWriter<K, V> {

    private SequenceFile.Writer out = null;
    private FSDataOutputStream fsout;

    public ValSequenceFileWriter(TaskAttemptContext context, Class<?> codecClass, FSDataOutputStream fsout) throws IOException {
        Configuration conf = context.getConfiguration();

        CompressionCodec codec = (CompressionCodec) 
                ReflectionUtils.newInstance(codecClass, conf);
        this.fsout = fsout;
        out = SequenceFile.createWriter(
            conf, fsout,
            NullWritable.class,
            context.getOutputValueClass(),
            CompressionType.BLOCK,
            codec);
    }

    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException,
            InterruptedException {
        out.close();
        fsout.close();
    }

    @Override
    public synchronized void write(K key, V value) throws IOException, InterruptedException {
        out.append(NullWritable.get(), value);
    }   
}

RCFileWriter

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;

import com.pplive.bip.metadata.hive.HiveTableInfo;

public class ValRCFileWriter<K, V> extends RecordWriter<K, V> {
    protected RCFile.Writer out;

    /*
     * buffered output array for output
     */
    private final BytesRefArrayWritable array;
    private final int numColumns;

    /**
     * construct
     * @param conf
     * @param codecClass
     * @param path
     * @throws IOException
     */
    public ValRCFileWriter(Configuration conf ,
            Class<?> codecClass, Path path) throws IOException {

        FileSystem fs = path.getFileSystem(conf);
        CompressionCodec codec = (CompressionCodec) 
            ReflectionUtils.newInstance(codecClass, conf);

        this.out = new RCFile.Writer(fs, conf, path, null, codec);
        numColumns = conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);
        this.array = new BytesRefArrayWritable(numColumns);
    }

    @Override
    public synchronized void write(K key, V value) throws IOException {

        String[] fields = value.toString().split(HiveTableInfo.FIELD_DELIMITED, -1);
        for (int i = 0; i < fields.length && i < this.numColumns; i++) {
            array.set(i, new BytesRefWritable(fields[i].getBytes("UTF-8")));
        }
        out.append(array);
    }

    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException,
            InterruptedException {
        out.close();        
    }
}

ORCFileWriter

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.*;

import com.google.common.base.Throwables;
import com.pplive.bip.metadata.hive.HiveColumnInfo;
import com.pplive.bip.metadata.hive.HiveColumnType;
import com.pplive.bip.metadata.log.LogInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
import org.apache.hadoop.hive.serde2.typeinfo.*;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.pplive.bip.metadata.hive.HiveTableInfo;

public class ValORCFileWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {

    private final static String MapTypeString = "MAP<STRING,STRING>";

    private final static String ArrayTypeString = "ARRAY<STRING>";

    private  OrcSerde orcSerde;

    private RecordWriter writer;

    private final SettableStructObjectInspector tableInspector;

    private final List<StructField> structFields;

    private final Object orcRow;

    private final int numColumns;

    private static Constructor<? extends RecordWriter> getOrcWriterConstructor()
    {
        try {
            String writerClassName = OrcOutputFormat.class.getName() + "$OrcRecordWriter";
            Constructor<? extends RecordWriter> constructor = OrcOutputFormat.class.getClassLoader()
                    .loadClass(writerClassName).asSubclass(RecordWriter.class)
                    .getDeclaredConstructor(Path.class, OrcFile.WriterOptions.class);
            constructor.setAccessible(true);
            return constructor;
        }
        catch (ReflectiveOperationException e) {
            throw Throwables.propagate(e);
        }
    }



    private static String  getHiveType(HiveColumnInfo column) {
        String hiveType;
        HiveColumnType columnType = column.getHiveType();
        if (columnType == HiveColumnType.Map)
            hiveType =  MapTypeString;
        else if (columnType == HiveColumnType.Complex)
            hiveType =  column.getColumnTypeValue();
        else if (columnType == HiveColumnType.Array)
            hiveType =  ArrayTypeString;
        else
            hiveType = column.getHiveType().toString();

        return hiveType.toLowerCase();
    }

    private static ObjectInspector getObjectInspector(String hiveType) {
        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(hiveType.toLowerCase());
        return TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
    }

    /**
     * construct
     * @param conf
     * @param path
     * @throws IOException
     */
    public ValORCFileWriter(Configuration conf, LogInfo logInfo, Path path) throws IOException {
        HiveTableInfo hiveTableInfo = logInfo.getHiveTable();
        List<String> columnNames = new ArrayList<String>();
        List<String> hiveTypeNames = new ArrayList<String>();
        List<ObjectInspector> columnInspectors = new ArrayList<ObjectInspector>();

        for (HiveColumnInfo columnInfo: hiveTableInfo.getHiveColumns()) {
            columnNames.add(columnInfo.getName());
            String hiveType = getHiveType(columnInfo);
            hiveTypeNames.add(hiveType);
            columnInspectors.add(getObjectInspector(hiveType));
        }

        this.tableInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnInspectors);
        this.structFields = (List<StructField>) tableInspector.getAllStructFieldRefs();

        Constructor<? extends RecordWriter> writerConstructor = getOrcWriterConstructor();
        try {
            this.writer = writerConstructor.newInstance(path, OrcFile.writerOptions(conf).inspector(this.tableInspector));
        } catch (ReflectiveOperationException  e) {
            throw new RuntimeException("Failed to create writer", e);
        }
        this.orcSerde = new OrcSerde();
        Properties properties = new Properties();
        properties.setProperty(hive_metastoreConstants.META_TABLE_COLUMNS, StringUtils.join(columnNames, ','));
        properties.setProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, StringUtils.join(hiveTypeNames, ':'));
        this.orcSerde.initialize(conf, properties);
        this.orcRow = tableInspector.create();
        this.numColumns = hiveTableInfo.getHiveColumnCount();
    }

    @Override
    public synchronized void write(K key, V value) throws IOException {
        String[] fields = value.toString().split(HiveTableInfo.FIELD_DELIMITED, -1);
        for (int i = 0; i < fields.length && i < this.numColumns; i++) {
            StructField sf = structFields.get(i);
            tableInspector.setStructFieldData(orcRow, sf, getJavaObject(fields[i], sf.getFieldObjectInspector()));
        }
        this.writer.write(this.orcSerde.serialize(orcRow, tableInspector));
    }

    private Object getJavaObject(String value, ObjectInspector oi) {
        Class clazz = oi.getClass();
        if(value.isEmpty()) {
            return null;
        }
        Object o;
        try {
            if (clazz == JavaShortObjectInspector.class) {
                o = new Short(value);
            } else if (clazz == JavaIntObjectInspector.class) {
                o = new Integer(value);
            }  else if (clazz == JavaLongObjectInspector.class) {
                o = new Long(value);
            } else if (clazz == JavaStringObjectInspector.class) {
                o = value;
            } else if (clazz == JavaFloatObjectInspector.class) {
                o = new Float(value);
            } else if (clazz == JavaDoubleObjectInspector.class) {
                o = new Double(value);
            } else if (clazz == StandardListObjectInspector.class) {
                ObjectInspector elementObjectInspector = ((StandardListObjectInspector)oi).getListElementObjectInspector();
                String[] vs = value.split(",");
                List l  = new ArrayList();
                for(String v: vs) {
                    l.add(getJavaObject(v,elementObjectInspector));
                }
                o = l;
            } else if (clazz == StandardMapObjectInspector.class) {
                ObjectInspector keyObjectInspector = ((StandardMapObjectInspector)oi).getMapKeyObjectInspector();
                ObjectInspector valueObjectInspector = ((StandardMapObjectInspector)oi).getMapValueObjectInspector();
                Map m = new HashMap();
                if(!value.isEmpty()) {
                    String[] vs = value.split(",");
                    for(String v: vs) {
                        String[] kv = v.split(":");
                        if(kv.length == 2) {
                            m.put(getJavaObject(kv[0],keyObjectInspector), getJavaObject(kv[1],valueObjectInspector));
                        }
                    }
                }
                o = m;
            } else if (clazz == StandardStructObjectInspector.class) {
                StandardStructObjectInspector soi = (StandardStructObjectInspector)oi;
                List<StructField> fields = (List<StructField>) soi.getAllStructFieldRefs();
                ArrayList result = (ArrayList)soi.create();
                String[] vs = value.split(":");
                if(vs.length == fields.size()) {
                    for (int i = 0; i < fields.size(); i++) {
                        StructField structField = fields.get(i);
                        soi.setStructFieldData(result,structField,getJavaObject(vs[i], structField.getFieldObjectInspector()));
                    }
                }
                o = result;
            } else {
                throw new RuntimeException("invalid object ObjectInspector" + oi.toString());
            }
        }catch (NumberFormatException e) {
            o = null;
        }

        return o;
    }

    @Override
    public synchronized void close(TaskAttemptContext context) throws IOException, InterruptedException {
        writer.close(false);
    }

三種不同的Writer輸出的資料查詢效率依次變高,ORC最常用,效率也最高。