hadoop[8]-使用自定義類處理資料
阿新 • • 發佈:2018-12-02
如果處理的資料結構比較複雜,最好自定義一個類來做mapper和reduce,自定義類需要注意的幾點:
- 實現org.apache.hadoop.io.Writable介面
- 需要提供無參建構函式
- 實現介面中的write和readFields方法
- 重寫toString方法
這裡以一個流量統計的示例舉例:
pom.xml中需要引入依賴:
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <View CodeartifactId>hadoop-common</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.4.1</version> </dependency> </dependencies>
FlowBean:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable{ private long upflow; private long dflow;View Codeprivate long sumflow; // 因為反射機制的需要,必須定義一個無參的建構函式 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 將我們要傳輸的資料序列化成位元組流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); } // 反序列化的方法 從資料位元組流中恢復出各個欄位 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } }
FlowCountMapper:
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.commons.lang.StringUtils; public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); String phone = fields[1]; long upflow = Long.parseLong(fields[fields.length - 3]); long dflow = Long.parseLong(fields[fields.length - 2]); FlowBean bean = new FlowBean(upflow, dflow); context.write(new Text(phone), bean); } }View Code
FlowCountReducer:
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upflowsum = 0; long dflowsum = 0; for (FlowBean value : values) { upflowsum += value.getUpflow(); dflowsum += value.getDflow(); } FlowBean bean = new FlowBean(upflowsum, dflowsum); context.write(key, bean); } }View Code
FlowCountSubmitter:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSubmitter { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowCountSubmitter.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }View Code
之後打包,上傳到00伺服器,在00伺服器上準備好資料目錄和分析結果輸出目錄,並且上傳資料檔案到hadoop的srcdata路徑下
hadoop fs -mkdir -p /flow/output
hadoop fs -mkdir -p /flow/srcdata
執行:hadoop jar hadoop-mapreduce-customer-1.0.jar com.wange.FlowCountSubmitter /flow/srcdata /flow/output,引數分別為:main函式所在路徑、待分析的檔案所在的目錄、分析結果資料的目錄,執行完畢就可以看到結果了。
檢視yarn的web管理:http://hadoop-server-00:8088/cluster、hdfs的目錄web管理為:http://hadoop-server-00:50070