Hadoop通過HCatalog編寫Mapreduce任務訪問hive庫中schema資料
阿新 • • 發佈:2022-05-02
1、dirver
package com.kangaroo.hadoop.drive; import java.util.Map; import java.util.Properties; import com.kangaroo.hadoop.mapper.AggregateMapper; import com.kangaroo.hadoop.reducer.AggregateReducer; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.kangaroo.hadoop.utils.PropertiesUtil; public class DriveMain extends Configured implements Tool { private static final Logger logger = LoggerFactory.getLogger(DriveMain.class); private Configuration conf; private PropertiesUtil propUtil; public DriveMain() { this.conf = new Configuration(); this.propUtil = new PropertiesUtil("configure.properties"); } public int run(String[] args) throws Exception { try { logger.info("MapReduce Job Beginning."); String dbName = args[0]; String tableName = args[1]; String partition = args[2]; String sumField = args[3]; String outPath = args[4]; String partFilter = partitionFormat(partition); logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}", dbName, tableName, partition, sumField, outPath, partFilter); this.conf.set("sumField", sumField); this.setMapRedConfiguration(); Job job = this.setJobConfiguration(this.conf); HCatInputFormat.setInput(job, dbName, tableName, partFilter); logger.info("setInput successfully."); FileOutputFormat.setOutputPath(job, new Path(outPath)); logger.info("setOutput successfully."); return (job.waitForCompletion(true) ? 0 : 1); } catch (Exception ex) { logger.error(ex.getMessage()); throw ex; } } private Job setJobConfiguration(Configuration conf) throws Exception { try { logger.info("enter setJobConfiguration"); Job job = Job.getInstance(conf); job.setJarByClass(DriveMain.class); job.setInputFormatClass(HCatInputFormat.class); job.setMapperClass(AggregateMapper.class); job.setReducerClass(AggregateReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); logger.info("setJobConfiguration successfully."); return job; } catch (Exception ex) { logger.error("setJobConfiguration: " + ex.getMessage()); throw new Exception(ex); } } private void setMapRedConfiguration() { try { Properties properties = propUtil.getProperties(); logger.info("Load MapReduce Configuration Successfully."); for (Map.Entry entry : properties.entrySet()) { if (entry.getKey().toString().startsWith("mapred")) { conf.set(entry.getKey().toString(), entry.getValue().toString()); logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString()); } } logger.info("[MR][Config] Set MapReduce Configuration Successfully."); } catch (Exception e) { } } private String partitionFormat(String partition) { String format = ""; if(!partition.contains("pt") && ! partition.contains("dt")) { String[] items = partition.split("/"); String[] keys = {"year","month","day", "hour"}; for(int i=0; i<items.length; i++) { if (i == items.length-1) { format += keys[i] + "='" + items[i] + "'"; } else { format += keys[i] + "='" + items[i] + "' and "; } } } else { format = partition; } return format; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DriveMain(), args); System.exit(exitCode); } }
2、Mapper
package com.kangaroo.hadoop.mapper; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; @SuppressWarnings("rawtypes") public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> { private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class); private HCatSchema schema; private Text outKey; private Text outValue; private IntWritable one; @Override protected void setup(Context context) throws IOException, InterruptedException { outKey = new Text(); outValue = new Text(); schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException { String sumField = context.getConfiguration().get("sumField"); Map<String, String> recordMap = new HashMap<String, String>(); for (String fieldName : schema.getFieldNames()) { logger.info("fieldName={}", fieldName); String fieldValue = value.get(fieldName, schema).toString(); logger.info("fieldName={}, fieldValue={}", fieldName, fieldValue); recordMap.put(fieldName, fieldValue); logger.info("recordMap={}", recordMap.toString()); } outKey.set(recordMap.get(sumField)); outValue.set("1"); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(outKey, outValue); } }
3、Reducer
package com.kangaroo.hadoop.reducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hive.hcatalog.data.schema.HCatSchema; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @SuppressWarnings("rawtypes") public class AggregateReducer extends Reducer<Text, Text, Text, Text> { protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class); HCatSchema schema; Text outKey; Text outValue; @Override protected void setup(Context context) throws IOException, InterruptedException { schema = HCatInputFormat.getTableSchema(context.getConfiguration()); } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException { outKey.set(key); int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString()); } outValue.set(String.valueOf(sum)); } protected void cleanup(Context context) throws IOException, InterruptedException { context.write(outKey, outValue); } }
4、propertyUtil
package com.kangaroo.hadoop.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public class PropertiesUtil {
private String filePath;
public PropertiesUtil() {
this.filePath = "configure.properties";
}
public PropertiesUtil(String filePath) {
this.filePath = filePath;
}
public Properties getProperties() throws IOException {
Properties prop;
InputStream inStream = null;
try {
inStream = PropertiesUtil.class.getClassLoader()
.getResourceAsStream(this.filePath);
prop = new Properties();
prop.load(inStream);
return prop;
} finally {
if (inStream != null)
inStream.close();
}
}
}
5、配置
mapred.job.queue.name=root.XXX
mapred.jar=./XXX.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=XXX