1. 程式人生 > >Hadoop系列之Aggregate用法

Hadoop系列之Aggregate用法

1. aggregate簡介
aggregate是Hadoop提供的一個軟體包,其用來做一些通用的計算和聚合。
Generally speaking, in order to implement an application using Map/Reduce model, the developer needs to implement Map and Reduce functions (and possibly Combine function). However, for a lot of applications related to counting and statistics computing, these functions have very similarcharacteristics. This provides a package implementing those patterns

. In particular,the package provides a generic mapper class,a reducer class and a combiner class, and a set of built-in value aggregators.It also provides a generic utility class, ValueAggregatorJob, that offers a static function that creates map/reduce jobs。
在Streaming中通常使用Aggregate包作為reducer來做聚合統計。

2. aggregate class summary

DoubleValueSum

This class implements a value aggregator that sums up a sequence of double values.

可利用來統計Top K記錄,類似LongValueSum

LongValueMax This class implements a value aggregator that maintain the maximum of a sequence of long values.
LongValueMin This class implements a value aggregator that maintain the minimum of a sequence of long values.
LongValueSum This class implements a value aggregator that sums up a sequence of long values.
StringValueMax This class implements a value aggregator that maintain the biggest of a sequence of strings.
StringValueMin This class implements a value aggregator that maintain the smallest of a sequence of strings.
UniqValueCount This class implements a value aggregator that dedupes a sequence of objects.
UserDefinedValueAggregatorDescriptor This class implements a wrapper for a user defined value aggregator descriptor.
ValueAggregatorBaseDescriptor This class implements the common functionalities of the subclasses of ValueAggregatorDescriptor class.
ValueAggregatorCombiner This class implements the generic combiner of Aggregate.
ValueAggregatorJob This is the main class for creating a map/reduce job using Aggregate framework.
ValueAggregatorJobBase This abstract class implements some common functionalities of the the generic mapper, reducer and combiner classes of Aggregate.
ValueAggregatorMapper This class implements the generic mapper of Aggregate.
ValueAggregatorReducer This class implements the generic reducer of Aggregate.
ValueHistogram This class implements a value aggregator that computes the histogram of a sequence of strings

3. streaming中使用aggregate

在mapper任務的輸出中新增控制,如下:
function:key\tvalue
eg:
LongValueSum:key\tvalue
此外,置-reducer = aggregate。此時,Reducer使用aggregate中對應的function類對相同key的value進行操作,例如,設定function為LongValueSum則將對每個鍵值對應的value求和。


下面是一個python的例子:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myAggregatorForKeyCount.py \
    -reducer aggregate \
    -file myAggregatorForKeyCount.py \
    -jobconf mapred.reduce.tasks=12
python程式myAggregatorForKeyCount.py例子:

#!/usr/bin/python

import sys;

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline();
    try:
        while line:
            line = line[:-1];
            fields = line.split("\t");
            print generateLongCountToken(fields[0]);
            line = sys.stdin.readline();
    except "end of file":
        return None
if __name__ == "__main__":
     main(sys.argv)