Hadoop系列之Aggregate用法
1. aggregate簡介
aggregate是Hadoop提供的一個軟體包,其用來做一些通用的計算和聚合。
Generally speaking, in order to implement an application using Map/Reduce model, the developer needs to implement Map and Reduce functions (and possibly Combine function). However, for a lot
of applications related to counting and statistics computing, these functions have very similarcharacteristics. This provides a package implementing those patterns
在Streaming中通常使用Aggregate包作為reducer來做聚合統計。
2. aggregate class summary
DoubleValueSum |
This class implements a value aggregator that sums up a sequence of double values. 可利用來統計Top K記錄,類似LongValueSum |
LongValueMax | This class implements a value aggregator that maintain the maximum of a sequence of long values. |
LongValueMin | This class implements a value aggregator that maintain the minimum of a sequence of long values. |
LongValueSum | This class implements a value aggregator that sums up a sequence of long values. |
StringValueMax | This class implements a value aggregator that maintain the biggest of a sequence of strings. |
StringValueMin | This class implements a value aggregator that maintain the smallest of a sequence of strings. |
UniqValueCount | This class implements a value aggregator that dedupes a sequence of objects. |
UserDefinedValueAggregatorDescriptor | This class implements a wrapper for a user defined value aggregator descriptor. |
ValueAggregatorBaseDescriptor | This class implements the common functionalities of the subclasses of ValueAggregatorDescriptor class. |
ValueAggregatorCombiner | This class implements the generic combiner of Aggregate. |
ValueAggregatorJob | This is the main class for creating a map/reduce job using Aggregate framework. |
ValueAggregatorJobBase | This abstract class implements some common functionalities of the the generic mapper, reducer and combiner classes of Aggregate. |
ValueAggregatorMapper | This class implements the generic mapper of Aggregate. |
ValueAggregatorReducer | This class implements the generic reducer of Aggregate. |
ValueHistogram | This class implements a value aggregator that computes the histogram of a sequence of strings |
3. streaming中使用aggregate
在mapper任務的輸出中新增控制,如下:
function:key\tvalue
eg:
LongValueSum:key\tvalue
此外,置-reducer = aggregate。此時,Reducer使用aggregate中對應的function類對相同key的value進行操作,例如,設定function為LongValueSum則將對每個鍵值對應的value求和。
下面是一個python的例子:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myAggregatorForKeyCount.py \
-reducer aggregate \
-file myAggregatorForKeyCount.py \
-jobconf mapred.reduce.tasks=12
python程式myAggregatorForKeyCount.py例子:
#!/usr/bin/python
import sys;
def generateLongCountToken(id):
return "LongValueSum:" + id + "\t" + "1"
def main(argv):
line = sys.stdin.readline();
try:
while line:
line = line[:-1];
fields = line.split("\t");
print generateLongCountToken(fields[0]);
line = sys.stdin.readline();
except "end of file":
return None
if __name__ == "__main__":
main(sys.argv)