控制MapReduce輸出檔案個數及格式
阿新 • • 發佈:2019-02-06
控制MapReduce多檔案輸出
預設情況下MapReduce任務結束後一個reduce產生一個輸出檔案,檔名類似part-xxxxx, 有時為了方便後續對這些檔案的處理,比如根據檔名import到不通的hive分割槽,我們需要控制reduce輸出產生的檔名,讓相同的reduce key寫入同一個檔案,此時可繼承MultipleOutputFormat過載generateFileNameForKeyValue定製OutputFormat。
public static class MyMultipleOutputFormat extends MultipleOutputFormat<K, V>{ protected String generateFileNameForKeyValue(K key, V value) { return String.format("%s_%s", key.getDt(), key.getHour()); } abstract protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String path, Progressable arg3) throws IOException; }
這樣同一天相同小時的日誌就被聚集到同一個檔案,很容易匯入到對應的小時分割槽。
控制MapReduce輸出格式
reduce輸出檔案格式不同,儲存和讀取效率都會有差別,可以過載getBaseRecordWriter返回不同格式的Writer.
SequenceFileWriter
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.ReflectionUtils; public class ValSequenceFileWriter<K, V> extends RecordWriter<K, V> { private SequenceFile.Writer out = null; private FSDataOutputStream fsout; public ValSequenceFileWriter(TaskAttemptContext context, Class<?> codecClass, FSDataOutputStream fsout) throws IOException { Configuration conf = context.getConfiguration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); this.fsout = fsout; out = SequenceFile.createWriter( conf, fsout, NullWritable.class, context.getOutputValueClass(), CompressionType.BLOCK, codec); } @Override public synchronized void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); fsout.close(); } @Override public synchronized void write(K key, V value) throws IOException, InterruptedException { out.append(NullWritable.get(), value); } }
RCFileWriter
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.ReflectionUtils; import com.pplive.bip.metadata.hive.HiveTableInfo; public class ValRCFileWriter<K, V> extends RecordWriter<K, V> { protected RCFile.Writer out; /* * buffered output array for output */ private final BytesRefArrayWritable array; private final int numColumns; /** * construct * @param conf * @param codecClass * @param path * @throws IOException */ public ValRCFileWriter(Configuration conf , Class<?> codecClass, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); this.out = new RCFile.Writer(fs, conf, path, null, codec); numColumns = conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0); this.array = new BytesRefArrayWritable(numColumns); } @Override public synchronized void write(K key, V value) throws IOException { String[] fields = value.toString().split(HiveTableInfo.FIELD_DELIMITED, -1); for (int i = 0; i < fields.length && i < this.numColumns; i++) { array.set(i, new BytesRefWritable(fields[i].getBytes("UTF-8"))); } out.append(array); } @Override public synchronized void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } }
ORCFileWriter
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.*;
import com.google.common.base.Throwables;
import com.pplive.bip.metadata.hive.HiveColumnInfo;
import com.pplive.bip.metadata.hive.HiveColumnType;
import com.pplive.bip.metadata.log.LogInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.*;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
import org.apache.hadoop.hive.serde2.typeinfo.*;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.pplive.bip.metadata.hive.HiveTableInfo;
public class ValORCFileWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
private final static String MapTypeString = "MAP<STRING,STRING>";
private final static String ArrayTypeString = "ARRAY<STRING>";
private OrcSerde orcSerde;
private RecordWriter writer;
private final SettableStructObjectInspector tableInspector;
private final List<StructField> structFields;
private final Object orcRow;
private final int numColumns;
private static Constructor<? extends RecordWriter> getOrcWriterConstructor()
{
try {
String writerClassName = OrcOutputFormat.class.getName() + "$OrcRecordWriter";
Constructor<? extends RecordWriter> constructor = OrcOutputFormat.class.getClassLoader()
.loadClass(writerClassName).asSubclass(RecordWriter.class)
.getDeclaredConstructor(Path.class, OrcFile.WriterOptions.class);
constructor.setAccessible(true);
return constructor;
}
catch (ReflectiveOperationException e) {
throw Throwables.propagate(e);
}
}
private static String getHiveType(HiveColumnInfo column) {
String hiveType;
HiveColumnType columnType = column.getHiveType();
if (columnType == HiveColumnType.Map)
hiveType = MapTypeString;
else if (columnType == HiveColumnType.Complex)
hiveType = column.getColumnTypeValue();
else if (columnType == HiveColumnType.Array)
hiveType = ArrayTypeString;
else
hiveType = column.getHiveType().toString();
return hiveType.toLowerCase();
}
private static ObjectInspector getObjectInspector(String hiveType) {
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(hiveType.toLowerCase());
return TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo);
}
/**
* construct
* @param conf
* @param path
* @throws IOException
*/
public ValORCFileWriter(Configuration conf, LogInfo logInfo, Path path) throws IOException {
HiveTableInfo hiveTableInfo = logInfo.getHiveTable();
List<String> columnNames = new ArrayList<String>();
List<String> hiveTypeNames = new ArrayList<String>();
List<ObjectInspector> columnInspectors = new ArrayList<ObjectInspector>();
for (HiveColumnInfo columnInfo: hiveTableInfo.getHiveColumns()) {
columnNames.add(columnInfo.getName());
String hiveType = getHiveType(columnInfo);
hiveTypeNames.add(hiveType);
columnInspectors.add(getObjectInspector(hiveType));
}
this.tableInspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnInspectors);
this.structFields = (List<StructField>) tableInspector.getAllStructFieldRefs();
Constructor<? extends RecordWriter> writerConstructor = getOrcWriterConstructor();
try {
this.writer = writerConstructor.newInstance(path, OrcFile.writerOptions(conf).inspector(this.tableInspector));
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Failed to create writer", e);
}
this.orcSerde = new OrcSerde();
Properties properties = new Properties();
properties.setProperty(hive_metastoreConstants.META_TABLE_COLUMNS, StringUtils.join(columnNames, ','));
properties.setProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, StringUtils.join(hiveTypeNames, ':'));
this.orcSerde.initialize(conf, properties);
this.orcRow = tableInspector.create();
this.numColumns = hiveTableInfo.getHiveColumnCount();
}
@Override
public synchronized void write(K key, V value) throws IOException {
String[] fields = value.toString().split(HiveTableInfo.FIELD_DELIMITED, -1);
for (int i = 0; i < fields.length && i < this.numColumns; i++) {
StructField sf = structFields.get(i);
tableInspector.setStructFieldData(orcRow, sf, getJavaObject(fields[i], sf.getFieldObjectInspector()));
}
this.writer.write(this.orcSerde.serialize(orcRow, tableInspector));
}
private Object getJavaObject(String value, ObjectInspector oi) {
Class clazz = oi.getClass();
if(value.isEmpty()) {
return null;
}
Object o;
try {
if (clazz == JavaShortObjectInspector.class) {
o = new Short(value);
} else if (clazz == JavaIntObjectInspector.class) {
o = new Integer(value);
} else if (clazz == JavaLongObjectInspector.class) {
o = new Long(value);
} else if (clazz == JavaStringObjectInspector.class) {
o = value;
} else if (clazz == JavaFloatObjectInspector.class) {
o = new Float(value);
} else if (clazz == JavaDoubleObjectInspector.class) {
o = new Double(value);
} else if (clazz == StandardListObjectInspector.class) {
ObjectInspector elementObjectInspector = ((StandardListObjectInspector)oi).getListElementObjectInspector();
String[] vs = value.split(",");
List l = new ArrayList();
for(String v: vs) {
l.add(getJavaObject(v,elementObjectInspector));
}
o = l;
} else if (clazz == StandardMapObjectInspector.class) {
ObjectInspector keyObjectInspector = ((StandardMapObjectInspector)oi).getMapKeyObjectInspector();
ObjectInspector valueObjectInspector = ((StandardMapObjectInspector)oi).getMapValueObjectInspector();
Map m = new HashMap();
if(!value.isEmpty()) {
String[] vs = value.split(",");
for(String v: vs) {
String[] kv = v.split(":");
if(kv.length == 2) {
m.put(getJavaObject(kv[0],keyObjectInspector), getJavaObject(kv[1],valueObjectInspector));
}
}
}
o = m;
} else if (clazz == StandardStructObjectInspector.class) {
StandardStructObjectInspector soi = (StandardStructObjectInspector)oi;
List<StructField> fields = (List<StructField>) soi.getAllStructFieldRefs();
ArrayList result = (ArrayList)soi.create();
String[] vs = value.split(":");
if(vs.length == fields.size()) {
for (int i = 0; i < fields.size(); i++) {
StructField structField = fields.get(i);
soi.setStructFieldData(result,structField,getJavaObject(vs[i], structField.getFieldObjectInspector()));
}
}
o = result;
} else {
throw new RuntimeException("invalid object ObjectInspector" + oi.toString());
}
}catch (NumberFormatException e) {
o = null;
}
return o;
}
@Override
public synchronized void close(TaskAttemptContext context) throws IOException, InterruptedException {
writer.close(false);
}
三種不同的Writer輸出的資料查詢效率依次變高,ORC最常用,效率也最高。