Hadoop-rm綜合案例2.分割槽排序多目錄輸出
阿新 • • 發佈:2021-01-10
rm綜合案例-好差評
需求
現在有一些訂單的評論資料,需求,將訂單按照好評與差評區分開來,將資料輸出到不同的檔案目錄下,資料內容如下圖,其中資料第九個欄位表示好評,中評,差評。0:好評,1:中評,2:差評。現需要根據好評,中評,差評把資料分類並輸出到不同的目錄中,並且要求按照時間順序降序排列。
分析
自定義InputFormat合併小檔案
自定義分割槽根據評論等級把資料分割槽
程式碼實現
Mapper
package com.lagou.mr.comment.step2;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java. io.IOException;
//第一對kv:使用SequenceFileinputformat讀取,所以key:Text,Value:BytesWritable(原因是生成sequencefile 檔案指定就是這種型別)
public class CommentMapper extends Mapper<Text, BytesWritable, CommentBean, NullWritable> {
//key就是檔名
//value:一個檔案的完整內容
@Override
protected void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
//且分割槽每一行
String str = new String(value.getBytes());
String[] lines = str.split("\n");
for (String line : lines) {
CommentBean commentBean = parseStrToCommentBean(line);
if (null != commentBean) {
context.write(commentBean, NullWritable.get());
}
}
}
//切分字串封裝成commentbean物件
public CommentBean parseStrToCommentBean(String line) {
if (StringUtils.isNotBlank(line)) {
//每一行進行切分
String[] fields = line.split("\t");
if (fields.length >= 9) {
return new CommentBean(fields[0], fields[1], fields[2],
Integer.parseInt(fields[3]), fields[4], fields[5],
fields[6], Integer.parseInt(fields[7]), fields[8]);
}
return null;
}
return null;
}
}
bean
package com.lagou.mr.comment.step2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CommentBean implements WritableComparable<CommentBean> {
private String orderId;
private String comment;
private String commentExt;
private int goodsNum;
private String phoneNum;
private String userName;
private String address;
private int commentStatus;
private String commentTime;
//無參構造
public CommentBean() {
}
public CommentBean(String orderId, String comment, String commentExt,
int goodsNum, String phoneNum, String userName, String address,
int commentStatus, String commentTime) {
this.orderId = orderId;
this.comment = comment;
this.commentExt = commentExt;
this.goodsNum = goodsNum;
this.phoneNum = phoneNum;
this.userName = userName;
this.address = address;
this.commentStatus = commentStatus;
this.commentTime = commentTime;
}
@Override
public String toString() {
return orderId + "\t" + comment + "\t" + commentExt + "\t" + goodsNum +
"\t" + phoneNum + "\t" + userName + "\t" + address + "\t" +
commentStatus + "\t" + commentTime;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
public String getCommentExt() {
return commentExt;
}
public void setCommentExt(String commentExt) {
this.commentExt = commentExt;
}
public int getGoodsNum() {
return goodsNum;
}
public void setGoodsNum(int goodsNum) {
this.goodsNum = goodsNum;
}
public String getPhoneNum() {
return phoneNum;
}
public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public int getCommentStatus() {
return commentStatus;
}
public void setCommentStatus(int commentStatus) {
this.commentStatus = commentStatus;
}
public String getCommentTime() {
return commentTime;
}
public void setCommentTime(String commentTime) {
this.commentTime = commentTime;
}
//定義排序規則,按照時間降序;0,1,-1
@Override
public int compareTo(CommentBean o) {
return o.getCommentTime().compareTo(this.commentTime);
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(comment);
out.writeUTF(commentExt);
out.writeInt(goodsNum);
out.writeUTF(phoneNum);
out.writeUTF(userName);
out.writeUTF(address);
out.writeInt(commentStatus);
out.writeUTF(commentTime);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.comment = in.readUTF();
this.commentExt = in.readUTF();
this.goodsNum = in.readInt();
this.phoneNum = in.readUTF();
this.userName = in.readUTF();
this.address = in.readUTF();
this.commentStatus = in.readInt();
this.commentTime = in.readUTF();
}
}
分割槽器
package com.lagou.mr.comment.step2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class CommentPartitioner extends Partitioner<CommentBean, NullWritable> {
@Override
public int getPartition(CommentBean commentBean, NullWritable nullWritable,
int numPartitions) {
// return (commentBean.getCommentStatus() & Integer.MAX_VALUE) % numPartitions;
return commentBean.getCommentStatus(); //0,1,2 -->對應分割槽編號的
}
}
總結與說明
分割槽器的作用是決定mapper中通過map方法出來的資料前往哪個分割槽。(預設通過key)
自定義OutputFormat
CommentOutputFormat
package com.lagou.mr.comment.step2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
//最終輸出的kv型別
public class CommentOutputFormat extends FileOutputFormat<CommentBean, NullWritable> {
//負責寫出資料的物件
@Override
public RecordWriter<CommentBean, NullWritable> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
//當前reducetask處理的分割槽編號來建立檔案獲取輸出流
//獲取到在Driver指定的輸出路徑;0是好評,1是中評,2是差評
String outputDir = conf.get(
"mapreduce.output.fileoutputformat.outputdir");
FSDataOutputStream goodOut = null;
FSDataOutputStream commonOut = null;
FSDataOutputStream badOut = null;
int id = job.getTaskAttemptID().getTaskID().getId(); //當前reducetask處理的分割槽編號
if (id == 0) {
//好評資料
goodOut = fs.create(new Path(outputDir + "\\good\\good.log"));
} else if (id == 1) {
//中評資料
commonOut = fs.create(new Path(outputDir + "\\common\\common.log"));
} else {
badOut = fs.create(new Path(outputDir + "\\bad\\bad.log"));
}
return new CommentRecorderWrtier(goodOut, commonOut, badOut);
}
}
問題與總結
此時指定建立輸出流會導致每一個Task都建立三個流。又因為hadoop不支援追加寫入。因此每次建立新的輸出流都會建立一個空白檔案覆蓋之前的檔案。導致只有差評的檔案有資料,
RecordWriter
package com.lagou.mr.comment.step2;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class CommentRecorderWrtier extends RecordWriter<CommentBean, NullWritable> {
//定義寫出資料的流
private FSDataOutputStream goodOut;
private FSDataOutputStream commonOut;
private FSDataOutputStream badOut;
public CommentRecorderWrtier(FSDataOutputStream goodOut,
FSDataOutputStream commonOut, FSDataOutputStream badOut) {
this.goodOut = goodOut;
this.commonOut = commonOut;
this.badOut = badOut;
}
//實現把資料根據不同的評論型別輸出到不同的目錄下
//寫出資料的邏輯
@Override
public void write(CommentBean key, NullWritable value)
throws IOException, InterruptedException {
int commentStatus = key.getCommentStatus();
String beanStr = key.toString();
if (commentStatus == 0) {
goodOut.write(beanStr.getBytes());
goodOut.write("\n".getBytes());
goodOut.flush();
} else if (commentStatus == 1) {
commonOut.write(beanStr.getBytes());
commonOut.write("\n".getBytes());
commonOut.flush();
} else {
badOut.write(beanStr.getBytes());
badOut.write("\n".getBytes());
badOut.flush();
}
}
//釋放資源
@Override
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
IOUtils.closeStream(goodOut);
IOUtils.closeStream(commonOut);
IOUtils.closeStream(badOut);
}
}
Reduce
package com.lagou.mr.comment.step2;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommentReducer extends Reducer<CommentBean, NullWritable, CommentBean, NullWritable> {
@Override
protected void reduce(CommentBean key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
//遍歷values,輸出的是key;key:是一個引用地址,底層獲取value同時,key的值也發生了變化
for (NullWritable value : values) {
context.write(key, value);
}
}
}
Driver
public class CommentDriver {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CommentDriver");
job.setJarByClass(CommentDriver.class);
job.setMapperClass(CommentMapper.class);
job.setReducerClass(CommentReducer.class);
job.setMapOutputKeyClass(CommentBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(CommentBean.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(CommentPartitioner.class);
//指定inputformat型別
job.setInputFormatClass(SequenceFileInputFormat.class);
//指定輸出outputformat型別
job.setOutputFormatClass(CommentOutputFormat.class);
//指定輸入,輸出路徑
FileInputFormat.setInputPaths(job,
new Path("E:\\teach\\hadoop框架\\資料\\data\\mr綜合案例\\out"));
FileOutputFormat.setOutputPath(job,
new Path("E:\\teach\\hadoop框架\\資料\\data\\mr綜合案例\\multi-out"));
//指定reducetask的數量
job.setNumReduceTasks(3);
boolean b = job.waitForCompletion(true);
if (b) {
System.exit(0);
}
}
}