1. 程式人生 > 其它 >Hadoop-rm綜合案例2.分割槽排序多目錄輸出

Hadoop-rm綜合案例2.分割槽排序多目錄輸出

技術標籤:hadoop拉勾隨筆hadoop

rm綜合案例-好差評


需求
現在有一些訂單的評論資料,需求,將訂單按照好評與差評區分開來,將資料輸出到不同的檔案目錄下,資料內容如下圖,其中資料第九個欄位表示好評,中評,差評。0:好評,1:中評,2:差評。現需要根據好評,中評,差評把資料分類並輸出到不同的目錄中,並且要求按照時間順序降序排列。
在這裡插入圖片描述

分析

自定義InputFormat合併小檔案
自定義分割槽根據評論等級把資料分割槽

自定義OutputFormat把資料輸出到多個目錄


程式碼實現


Mapper

package com.lagou.mr.comment.step2;

import org.apache.commons.lang3.StringUtils;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.
io.IOException; //第一對kv:使用SequenceFileinputformat讀取,所以key:Text,Value:BytesWritable(原因是生成sequencefile 檔案指定就是這種型別) public class CommentMapper extends Mapper<Text, BytesWritable, CommentBean, NullWritable> { //key就是檔名 //value:一個檔案的完整內容 @Override protected void map(Text key, BytesWritable value,
Context context) throws IOException, InterruptedException { //且分割槽每一行 String str = new String(value.getBytes()); String[] lines = str.split("\n"); for (String line : lines) { CommentBean commentBean = parseStrToCommentBean(line); if (null != commentBean) { context.write(commentBean, NullWritable.get()); } } } //切分字串封裝成commentbean物件 public CommentBean parseStrToCommentBean(String line) { if (StringUtils.isNotBlank(line)) { //每一行進行切分 String[] fields = line.split("\t"); if (fields.length >= 9) { return new CommentBean(fields[0], fields[1], fields[2], Integer.parseInt(fields[3]), fields[4], fields[5], fields[6], Integer.parseInt(fields[7]), fields[8]); } return null; } return null; } }

bean

package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class CommentBean implements WritableComparable<CommentBean> {
    private String orderId;
    private String comment;
    private String commentExt;
    private int goodsNum;
    private String phoneNum;
    private String userName;
    private String address;
    private int commentStatus;
    private String commentTime;

    //無參構造 
    public CommentBean() {
    }

    public CommentBean(String orderId, String comment, String commentExt,
        int goodsNum, String phoneNum, String userName, String address,
        int commentStatus, String commentTime) {
        this.orderId = orderId;
        this.comment = comment;
        this.commentExt = commentExt;
        this.goodsNum = goodsNum;
        this.phoneNum = phoneNum;
        this.userName = userName;
        this.address = address;
        this.commentStatus = commentStatus;
        this.commentTime = commentTime;
    }

    @Override
    public String toString() {
        return orderId + "\t" + comment + "\t" + commentExt + "\t" + goodsNum +
        "\t" + phoneNum + "\t" + userName + "\t" + address + "\t" +
        commentStatus + "\t" + commentTime;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getComment() {
        return comment;
    }

    public void setComment(String comment) {
        this.comment = comment;
    }

    public String getCommentExt() {
        return commentExt;
    }

    public void setCommentExt(String commentExt) {
        this.commentExt = commentExt;
    }

    public int getGoodsNum() {
        return goodsNum;
    }

    public void setGoodsNum(int goodsNum) {
        this.goodsNum = goodsNum;
    }

    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public int getCommentStatus() {
        return commentStatus;
    }

    public void setCommentStatus(int commentStatus) {
        this.commentStatus = commentStatus;
    }

    public String getCommentTime() {
        return commentTime;
    }

    public void setCommentTime(String commentTime) {
        this.commentTime = commentTime;
    }

    //定義排序規則,按照時間降序;0,1,-1 
    @Override
    public int compareTo(CommentBean o) {
        return o.getCommentTime().compareTo(this.commentTime);
    }

    //序列化 
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(comment);
        out.writeUTF(commentExt);
        out.writeInt(goodsNum);
        out.writeUTF(phoneNum);
        out.writeUTF(userName);
        out.writeUTF(address);
        out.writeInt(commentStatus);
        out.writeUTF(commentTime);
    }

    //反序列化 
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.comment = in.readUTF();
        this.commentExt = in.readUTF();
        this.goodsNum = in.readInt();
        this.phoneNum = in.readUTF();
        this.userName = in.readUTF();
        this.address = in.readUTF();
        this.commentStatus = in.readInt();
        this.commentTime = in.readUTF();
    }
}


分割槽器

package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;


public class CommentPartitioner extends Partitioner<CommentBean, NullWritable> {
    @Override
    public int getPartition(CommentBean commentBean, NullWritable nullWritable,
        int numPartitions) {
        // return (commentBean.getCommentStatus() & Integer.MAX_VALUE) % numPartitions; 
        return commentBean.getCommentStatus(); //0,1,2 -->對應分割槽編號的 
    }
}

總結與說明

分割槽器的作用是決定mapper中通過map方法出來的資料前往哪個分割槽。(預設通過key)


自定義OutputFormat

CommentOutputFormat

package com.lagou.mr.comment.step2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


//最終輸出的kv型別 
public class CommentOutputFormat extends FileOutputFormat<CommentBean, NullWritable> {
    //負責寫出資料的物件 
    @Override
    public RecordWriter<CommentBean, NullWritable> getRecordWriter(
        TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        FileSystem fs = FileSystem.get(conf);

        //當前reducetask處理的分割槽編號來建立檔案獲取輸出流 
        //獲取到在Driver指定的輸出路徑;0是好評,1是中評,2是差評 
        String outputDir = conf.get(
                "mapreduce.output.fileoutputformat.outputdir");
        FSDataOutputStream goodOut = null;
        FSDataOutputStream commonOut = null;
        FSDataOutputStream badOut = null;
        int id = job.getTaskAttemptID().getTaskID().getId(); //當前reducetask處理的分割槽編號 

        if (id == 0) {
            //好評資料 
            goodOut = fs.create(new Path(outputDir + "\\good\\good.log"));
        } else if (id == 1) {
            //中評資料
            commonOut = fs.create(new Path(outputDir + "\\common\\common.log"));
        } else {
            badOut = fs.create(new Path(outputDir + "\\bad\\bad.log"));
        }

        return new CommentRecorderWrtier(goodOut, commonOut, badOut);
    }
}

問題與總結

在這裡插入圖片描述
此時指定建立輸出流會導致每一個Task都建立三個流。又因為hadoop不支援追加寫入。因此每次建立新的輸出流都會建立一個空白檔案覆蓋之前的檔案。導致只有差評的檔案有資料,

RecordWriter

package com.lagou.mr.comment.step2;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;


public class CommentRecorderWrtier extends RecordWriter<CommentBean, NullWritable> {
    //定義寫出資料的流 
    private FSDataOutputStream goodOut;
    private FSDataOutputStream commonOut;
    private FSDataOutputStream badOut;

    public CommentRecorderWrtier(FSDataOutputStream goodOut,
        FSDataOutputStream commonOut, FSDataOutputStream badOut) {
        this.goodOut = goodOut;
        this.commonOut = commonOut;
        this.badOut = badOut;
    }

    //實現把資料根據不同的評論型別輸出到不同的目錄下 
    //寫出資料的邏輯 
    @Override
    public void write(CommentBean key, NullWritable value)
        throws IOException, InterruptedException {
        int commentStatus = key.getCommentStatus();
        String beanStr = key.toString();

        if (commentStatus == 0) {
            goodOut.write(beanStr.getBytes());
            goodOut.write("\n".getBytes());
            goodOut.flush();
        } else if (commentStatus == 1) {
            commonOut.write(beanStr.getBytes());
            commonOut.write("\n".getBytes());
            commonOut.flush();
        } else {
            badOut.write(beanStr.getBytes());
            badOut.write("\n".getBytes());
            badOut.flush();
        }
    }

    //釋放資源 
    @Override
    public void close(TaskAttemptContext context)
        throws IOException, InterruptedException {
        IOUtils.closeStream(goodOut);
        IOUtils.closeStream(commonOut);
        IOUtils.closeStream(badOut);
    }
}


Reduce

package com.lagou.mr.comment.step2;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class CommentReducer extends Reducer<CommentBean, NullWritable, CommentBean, NullWritable> {
    @Override
    protected void reduce(CommentBean key, Iterable<NullWritable> values,
        Context context) throws IOException, InterruptedException {
        //遍歷values,輸出的是key;key:是一個引用地址,底層獲取value同時,key的值也發生了變化 
        for (NullWritable value : values) {
            context.write(key, value);
        }
    }
}


Driver

public class CommentDriver {
    public static void main(String[] args)
        throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CommentDriver");
        job.setJarByClass(CommentDriver.class);
        job.setMapperClass(CommentMapper.class);
        job.setReducerClass(CommentReducer.class);
        job.setMapOutputKeyClass(CommentBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(CommentBean.class);
        job.setOutputValueClass(NullWritable.class);
        job.setPartitionerClass(CommentPartitioner.class);
        //指定inputformat型別 
        job.setInputFormatClass(SequenceFileInputFormat.class);
        //指定輸出outputformat型別 
        job.setOutputFormatClass(CommentOutputFormat.class);
        //指定輸入,輸出路徑 
        FileInputFormat.setInputPaths(job,
            new Path("E:\\teach\\hadoop框架\\資料\\data\\mr綜合案例\\out"));
        FileOutputFormat.setOutputPath(job,
            new Path("E:\\teach\\hadoop框架\\資料\\data\\mr綜合案例\\multi-out"));
        //指定reducetask的數量 
        job.setNumReduceTasks(3);

        boolean b = job.waitForCompletion(true);

        if (b) {
            System.exit(0);
        }
    }
}