Mapreduce例項-分組排重(group by distinct)
阿新 • • 發佈:2019-01-03
需要實現以下幾個類,程式碼太多,列了下主要程式碼,可根據排重資料的特徵判讀是否需要新增combiner來提速。
public class GroupComparator implements RawComparator<MyBinaryKey> { @Override public int compare(MyBinaryKey o1, MyBinaryKey o2) { return o1.toString().compareTo(o2.toString()); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 2, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 2); } } public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable<BinaryComparable>{ //根據需要新增屬性,get set方法 @Override public void readFields(DataInput in) throws IOException { this.channelid = in.readLong(); this.scope = in.readInt(); this.type = in.readInt(); int attrlen = in.readInt(); attr = new byte[attrlen]; in.readFully(attr); this.vv = in.readInt(); this.vedioends = in.readInt(); this.playtime = in.readInt(); } @Override public byte[] getBytes() { ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutput out = new DataOutputStream(buf); try { out.writeLong(this.channelid); out.writeInt(this.scope); out.writeInt(this.type); out.writeInt(this.attr.length);//需要新增比較屬性長度,避免比較時屬性長度不等且前面相同是匹配為相同 out.write(this.attr); out.writeInt(this.vv); out.writeInt(this.vedioends); out.writeInt(this.playtime); return buf.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return null; } } public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> { /** * 根據uv/ip取模分割槽,保證相同uv/ip落在同一分割槽 */ @Override public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) { int k=0; for(byte b : key.getAttr()){ k+=b&0xff; } return k%numPartitions; } } job.setMapOutputKeyClass(UVBinaryKey.class); job.setGroupingComparatorClass(GroupComparator.class); job.setPartitionerClass(MyPartitioner.class); map略
combiner(根據需要新增) reduce中的實現: @Override protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { long count = 0; byte[] tbsign = null; for (NullWritable nullWritable : values) { byte[] attr = key.getAttr(); if (tbsign == null) { tbsign = attr; count++; } if (tbsign != null) { if (tbsign.length != attr.length) { count++; tbsign = attr; } else { for (int i = 0; i < tbsign.length; i++) { if (tbsign[i] != attr[i]) { count++; tbsign = attr; break; } } } } } StringBuffer out = new StringBuffer(); out.append(new String(key.getChannelId())) .append(Constants.FIELDS_TERMINATED).append(count); context.write(new Text(out.toString()), NullWritable.get()); }