1. 程式人生 > >hadoop深入研究:(十二)——自定義Writable

hadoop深入研究:(十二)——自定義Writable

自定義Writable

hadoop雖然已經實現了一些非常有用的Writable,而且你可以使用他們的組合做很多事情,但是如果你想構造一些更加複雜的結果,你可以自定義Writable來達到你的目的,我們以註釋的方式對自定義Writable進行講解(不許說我只帖程式碼佔篇幅哦,姿勢寫在註釋裡了):

package com.sweetop.styhadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * User: lastsweetop
 * Date: 13-7-17
 * Time: 下午8:50
 * To change this template use File | Settings | File Templates.
 */
public class EmploeeWritable implements WritableComparable<EmploeeWritable>{

    private Text name;
    private Text role;

    /**
     * 必須有預設的構造器皿,這樣Mapreduce方法才能建立物件,然後通過readFields方法從序列化的資料流中讀出進行賦值
     */
    public EmploeeWritable() {
        set(new Text(),new Text());
    }

    public EmploeeWritable(Text name, Text role) {
        set(name,role);
    }

    public void set(Text name,Text role) {
        this.name = name;
        this.role = role;
    }

    public Text getName() {
        return name;
    }

    public Text getRole() {
        return role;
    }

    /**
     * 通過成員物件本身的write方法,序列化每一個成員物件到輸出流中
     * @param dataOutput
     * @throws IOException
     */
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        name.write(dataOutput);
        role.write(dataOutput);
    }

    /**
     * 同上呼叫成員物件本身的readFields方法,從輸入流中反序列化每一個成員物件
     * @param dataInput
     * @throws IOException
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        name.readFields(dataInput);
        role.readFields(dataInput);
    }

    /**
     * implements WritableComparable必須要實現的方法,用於比較  排序
     * @param emploeeWritable
     * @return
     */
    @Override
    public int compareTo(EmploeeWritable emploeeWritable) {
        int cmp = name.compareTo(emploeeWritable.name);
        if(cmp!=0){
            return cmp;
        }
        return role.compareTo(emploeeWritable.role);
    }

    /**
     * MapReduce需要一個分割者(Partitioner)把map的輸出作為輸入分成一塊塊的餵給多個reduce)
     * 預設的是HashPatitioner,他是通過物件的hashcode函式進行分割,所以hashCode的好壞決定
     * 了分割是否均勻,他是一個很關鍵性的方法。
     * @return
     */
    @Override
    public int hashCode() {
        return name.hashCode()*163+role.hashCode();
    }

    @Override
    public boolean equals(Object o) {
        if(o instanceof EmploeeWritable){
            EmploeeWritable emploeeWritable=(EmploeeWritable)o;
            return name.equals(emploeeWritable.name) && role.equals(emploeeWritable.role);
        }
        return false;
    }

    /**
     * 如果你想自定義TextOutputformat作為輸出格式時的輸出,你需要重寫toString方法
     * @return
     */
    @Override
    public String toString() {
        return name+"\t"+role;
    }
}
Writable物件是可更改的而且經常被重用,因此儘量避免在write和readFields中分配物件。

自定義RawComparatorWritable

上面的EmploeeWritable已經可以跑的很溜了,但是還是有優化的空間,當作為MapReduce裡的key,需要進行比較時,因為他已經被序列化,想要比較他們,那麼首先要先反序列化成一個物件,然後再呼叫compareTo物件進行比較,但是這樣效率太低了,有沒有可能可以直接比較序列化後的結果呢,答案是肯定的,可以。我們只需要把EmploeeWritable的序列化後的結果拆成成員物件,然後比較成員物件即可,那麼來看程式碼(講解再次寫在註釋裡):
public static class Comparator extends WritableComparator{
        private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator();

        protected Comparator() {
            super(EmploeeWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            try {
                /**
                 * name是Text型別,Text是標準的UTF-8位元組流,
                 * 由一個變長整形開頭表示Text中文字所需要的長度,接下來就是文字本身的位元組陣列
                 * decodeVIntSize返回變長整形的長度,readVInt表示文字位元組陣列的長度,加起來就是第一個成員name的長度
                 */
                int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1);
                int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2);
                //和compareTo方法一樣,先比較name
                int cmp = TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2);
                if(cmp!=0){
                    return cmp;
                }
                //再比較role
                return TEXT_COMPARATOR.compare(b1,s1+nameL1,l1-nameL1,b2,s2+nameL2,l2-nameL2);
            } catch (IOException e) {
                throw new IllegalArgumentException();
            }
        }

        static {
            //註冊raw comprator,更象是繫結,這樣MapReduce使用EmploeeWritable時就會直接呼叫Comparator
            WritableComparator.define(EmploeeWritable.class,new Comparator());
        }
    }
我們沒有直接去實現RawComparator而是繼承於WritableComparator,因為WritableComparator提供了很多便捷的方法,並且對compare有個預設的實現。寫compare方法時一定要小心謹慎,因為都是在位元組上操作,可以好好參考下原始碼裡的一些Writable中Comparator的寫法,另外多看下WritableUtils也是由必要的,他裡面有很多簡便的方法可以使用。

自定義comparators

有時候,除了預設的comparator,你可能還需要一些自定義的comparator來生成不同的排序佇列,看一下下面這個示例,只比較name,兩個compare是同一意思,都是比較name大小:
public static class NameComparator extends WritableComparator{
        private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator();

        protected NameComparator() {
            super(EmploeeWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            try {
                int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1);
                int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2);
                return TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2);
            } catch (IOException e) {
                throw new IllegalArgumentException();
            }
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            if(a instanceof EmploeeWritable && b instanceof  EmploeeWritable){
                return ((EmploeeWritable)a).name.compareTo(((EmploeeWritable)b).name);
            }
            return super.compare(a,b);
        }
    }


如果我的文章對您有幫助,請用支付寶打賞: