hadoop深入研究:(十二)——自定義Writable
阿新 • • 發佈:2019-02-01
自定義Writable
hadoop雖然已經實現了一些非常有用的Writable,而且你可以使用他們的組合做很多事情,但是如果你想構造一些更加複雜的結果,你可以自定義Writable來達到你的目的,我們以註釋的方式對自定義Writable進行講解(不許說我只帖程式碼佔篇幅哦,姿勢寫在註釋裡了):
Writable物件是可更改的而且經常被重用,因此儘量避免在write和readFields中分配物件。package com.sweetop.styhadoop; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created with IntelliJ IDEA. * User: lastsweetop * Date: 13-7-17 * Time: 下午8:50 * To change this template use File | Settings | File Templates. */ public class EmploeeWritable implements WritableComparable<EmploeeWritable>{ private Text name; private Text role; /** * 必須有預設的構造器皿,這樣Mapreduce方法才能建立物件,然後通過readFields方法從序列化的資料流中讀出進行賦值 */ public EmploeeWritable() { set(new Text(),new Text()); } public EmploeeWritable(Text name, Text role) { set(name,role); } public void set(Text name,Text role) { this.name = name; this.role = role; } public Text getName() { return name; } public Text getRole() { return role; } /** * 通過成員物件本身的write方法,序列化每一個成員物件到輸出流中 * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { name.write(dataOutput); role.write(dataOutput); } /** * 同上呼叫成員物件本身的readFields方法,從輸入流中反序列化每一個成員物件 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { name.readFields(dataInput); role.readFields(dataInput); } /** * implements WritableComparable必須要實現的方法,用於比較 排序 * @param emploeeWritable * @return */ @Override public int compareTo(EmploeeWritable emploeeWritable) { int cmp = name.compareTo(emploeeWritable.name); if(cmp!=0){ return cmp; } return role.compareTo(emploeeWritable.role); } /** * MapReduce需要一個分割者(Partitioner)把map的輸出作為輸入分成一塊塊的餵給多個reduce) * 預設的是HashPatitioner,他是通過物件的hashcode函式進行分割,所以hashCode的好壞決定 * 了分割是否均勻,他是一個很關鍵性的方法。 * @return */ @Override public int hashCode() { return name.hashCode()*163+role.hashCode(); } @Override public boolean equals(Object o) { if(o instanceof EmploeeWritable){ EmploeeWritable emploeeWritable=(EmploeeWritable)o; return name.equals(emploeeWritable.name) && role.equals(emploeeWritable.role); } return false; } /** * 如果你想自定義TextOutputformat作為輸出格式時的輸出,你需要重寫toString方法 * @return */ @Override public String toString() { return name+"\t"+role; } }
自定義RawComparatorWritable
上面的EmploeeWritable已經可以跑的很溜了,但是還是有優化的空間,當作為MapReduce裡的key,需要進行比較時,因為他已經被序列化,想要比較他們,那麼首先要先反序列化成一個物件,然後再呼叫compareTo物件進行比較,但是這樣效率太低了,有沒有可能可以直接比較序列化後的結果呢,答案是肯定的,可以。我們只需要把EmploeeWritable的序列化後的結果拆成成員物件,然後比較成員物件即可,那麼來看程式碼(講解再次寫在註釋裡):我們沒有直接去實現RawComparator而是繼承於WritableComparator,因為WritableComparator提供了很多便捷的方法,並且對compare有個預設的實現。寫compare方法時一定要小心謹慎,因為都是在位元組上操作,可以好好參考下原始碼裡的一些Writable中Comparator的寫法,另外多看下WritableUtils也是由必要的,他裡面有很多簡便的方法可以使用。public static class Comparator extends WritableComparator{ private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator(); protected Comparator() { super(EmploeeWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { /** * name是Text型別,Text是標準的UTF-8位元組流, * 由一個變長整形開頭表示Text中文字所需要的長度,接下來就是文字本身的位元組陣列 * decodeVIntSize返回變長整形的長度,readVInt表示文字位元組陣列的長度,加起來就是第一個成員name的長度 */ int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1); int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2); //和compareTo方法一樣,先比較name int cmp = TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2); if(cmp!=0){ return cmp; } //再比較role return TEXT_COMPARATOR.compare(b1,s1+nameL1,l1-nameL1,b2,s2+nameL2,l2-nameL2); } catch (IOException e) { throw new IllegalArgumentException(); } } static { //註冊raw comprator,更象是繫結,這樣MapReduce使用EmploeeWritable時就會直接呼叫Comparator WritableComparator.define(EmploeeWritable.class,new Comparator()); } }
自定義comparators
有時候,除了預設的comparator,你可能還需要一些自定義的comparator來生成不同的排序佇列,看一下下面這個示例,只比較name,兩個compare是同一意思,都是比較name大小:public static class NameComparator extends WritableComparator{ private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator(); protected NameComparator() { super(EmploeeWritable.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1); int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2); return TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2); } catch (IOException e) { throw new IllegalArgumentException(); } } @Override public int compare(WritableComparable a, WritableComparable b) { if(a instanceof EmploeeWritable && b instanceof EmploeeWritable){ return ((EmploeeWritable)a).name.compareTo(((EmploeeWritable)b).name); } return super.compare(a,b); } }
如果我的文章對您有幫助,請用支付寶打賞: