1. 程式人生 > >hadoop常用演算法簡單例項

hadoop常用演算法簡單例項

例項一、對以下資料進行排序,根據收入減去支出得到最後結餘從大到小排序,資料如下:


SumStep執行之後結果如下:


SortStep執行之後結果為上圖根據結餘從大到小排序。

程式碼如下:

public class InfoBean implements WritableComparable<InfoBean>{

	private String account;
	
	private double income;
	
	private double expenses;
	
	private double surplus;
	
	public void set(String account, double income, double expenses){
		this.account = account;
		this.income = income;
		this.expenses = expenses;
		this.surplus = income - expenses;
	}
	
	@Override
	public String toString() {
		return this.income + "\t" + this.expenses + "\t" + this.surplus;
	}

	/**
	 * serialize
	 */
	public void write(DataOutput out) throws IOException {
		out.writeUTF(account);
		out.writeDouble(income);
		out.writeDouble(expenses);
		out.writeDouble(surplus);
	}

	/**
	 * deserialize
	 */
	public void readFields(DataInput in) throws IOException {
		this.account = in.readUTF();
		this.income = in.readDouble();
		this.expenses = in.readDouble();
		this.surplus = in.readDouble();
	}
	

	public int compareTo(InfoBean o) {
		if(this.income == o.getIncome()){
			return this.expenses > o.getExpenses() ? 1 : -1; 
		} else {
			return this.income > o.getIncome() ? -1 : 1;
		}
	}

	public String getAccount() {
		return account;
	}

	public void setAccount(String account) {
		this.account = account;
	}

	public double getIncome() {
		return income;
	}

	public void setIncome(double income) {
		this.income = income;
	}

	public double getExpenses() {
		return expenses;
	}

	public void setExpenses(double expenses) {
		this.expenses = expenses;
	}

	public double getSurplus() {
		return surplus;
	}

	public void setSurplus(double surplus) {
		this.surplus = surplus;
	}

	
}
public class SumStep {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SumStep.class);
		
		job.setMapperClass(SumMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(InfoBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(SumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(InfoBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}

	public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{

		private InfoBean bean = new InfoBean();
		private Text k = new Text();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// split 
			String line = value.toString();
			String[] fields = line.split("\t");
			// get useful field
			String account = fields[0];
			double income = Double.parseDouble(fields[1]);
			double expenses = Double.parseDouble(fields[2]);
			k.set(account);
			bean.set(account, income, expenses);
			context.write(k, bean);
		}
	}
	
	public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{

		private InfoBean bean = new InfoBean();
		@Override
		protected void reduce(Text key, Iterable<InfoBean> v2s, Context context)
				throws IOException, InterruptedException {
			
			double in_sum = 0;
			double out_sum = 0;
			for(InfoBean bean : v2s){
				in_sum += bean.getIncome();
				out_sum += bean.getExpenses();
			}
			bean.set("", in_sum, out_sum);
			context.write(key, bean);
		}
		
	}
}

此處的輸入為SumStep的輸出而不是原始檔作為輸入,當然也可以將兩個job合併到一起執行,此處不再討論。
public class SortStep {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SortStep.class);
		
		job.setMapperClass(SortMapper.class);
		job.setMapOutputKeyClass(InfoBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(SortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(InfoBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);

	}

	public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{

		private InfoBean bean = new InfoBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String account = fields[0];
			double income = Double.parseDouble(fields[1]);
			double expenses = Double.parseDouble(fields[2]);
			bean.set(account, income, expenses);
			context.write(bean, NullWritable.get());
		}
		
	}
	
	
	public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{

		private Text k = new Text();
		@Override
		protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)
				throws IOException, InterruptedException {
			String account = bean.getAccount();
			k.set(account);
			context.write(k, bean);
		}
		
	}
}

例項二、倒排索引,過程如下
Map階段
<0,"hello tom">
....


context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);

context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner階段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>

<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>

context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer階段
<"hello",{"a.txt->5","b.txt->3"}>


context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello	"a.txt->5 b.txt->3"
tom		"a.txt->2 b.txt->1"
kitty	"a.txt->1"
.......
程式碼如下:
public class InverseIndex {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		//設定jar
		job.setJarByClass(InverseIndex.class);
		
		//設定Mapper相關的屬性
		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
		
		//設定Reducer相關屬性
		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setCombinerClass(IndexCombiner.class);
				
		//提交任務
		job.waitForCompletion(true);
	}
	public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{

		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(" ");
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			Path path = inputSplit.getPath();
			String name = path.getName();
			for(String f : fields){
				k.set(f + "->" + name);
				v.set("1");
				context.write(k, v);
			}
		}
		
	}
	public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{

		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] fields = key.toString().split("->");
			long sum = 0;
			for(Text t : values){
				sum += Long.parseLong(t.toString());
			}
			k.set(fields[0]);
			v.set(fields[1] + "->" + sum);
			context.write(k, v);
		}
		
	}
	public static class IndexReducer extends Reducer<Text, Text, Text, Text>{

		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String value = "";
			for(Text t : values){
				value += t.toString() + " ";
			}
			v.set(value);
			context.write(key, v);
		}
		
	}

}

例項三、使用Partitioner使相同或者相似的資料傳遞到相同的reduce:

資料格式如下,分別代表手機號,上行流量,下行流量:


程式碼如下:

public class DataBean implements Writable {

	private String telNo;
	
	private long upPayLoad;
	
	private long downPayLoad;
	
	private long totalPayLoad;
	
	public DataBean(){}
	
	public DataBean(String telNo, long upPayLoad, long downPayLoad) {
		super();
		this.telNo = telNo;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = upPayLoad + downPayLoad;
	}
	
	@Override
	public String toString() {
		return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
	}

	/**
	 * 序列化
	 * 注意:1.型別 2.順序
	 */
	public void write(DataOutput out) throws IOException {
		out.writeUTF(telNo);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
		out.writeLong(totalPayLoad);
	}

	/**
	 * 反序列化
	 */
	public void readFields(DataInput in) throws IOException {
		this.telNo = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}

	public String getTelNo() {
		return telNo;
	}

	public void setTelNo(String telNo) {
		this.telNo = telNo;
	}

	public long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}

	public long getTotalPayLoad() {
		return totalPayLoad;
	}

	public void setTotalPayLoad(long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}
	
}

public class DataCount {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(DataCount.class);
		
		job.setMapperClass(DCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//設定reduce預設的Partitioner
		job.setPartitionerClass(ServiceProviderPartitioner.class);
		//此處需要設定reduce的數量
		job.setNumReduceTasks(Integer.parseInt(args[2]));
		
		job.waitForCompletion(true);
	}
	
	
	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//接收資料
			String line = value.toString();
			//分割資料
			String[] fields = line.split("\t");
			//獲取有效欄位,封裝到物件裡面
			//手機號
			String telNo = fields[0];
			//上行流量
			long up = Long.parseLong(fields[1]);
			//下行流量
			long down = Long.parseLong(fields[2]);
			
			//封裝資料,new DataBean
			DataBean bean = new DataBean(telNo, up, down);
			
			//輸出
			context.write(new Text(telNo), bean);
		}
	}
	
	
	public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{

		@Override
		protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
				throws IOException, InterruptedException {
			//定義計數器
			long up_sum = 0;
			long down_sum = 0;
			
			//迭代v2s,進行求和
			for(DataBean bean : v2s){
				up_sum += bean.getUpPayLoad();
				down_sum += bean.getDownPayLoad();
			}
			
			//封裝資料
			DataBean bean = new DataBean("", up_sum, down_sum);
			
			//輸出
			context.write(key, bean);
		}
		
	}

	public static class ServiceProviderPartitioner extends Partitioner<Text, DataBean>{
	
		private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
 		
		static {
			providerMap.put("139", 1);
			providerMap.put("138", 2);
			providerMap.put("159", 3);
		}
		
		@Override
		public int getPartition(Text key, DataBean value, int number) {
			String telNo = key.toString();
			String pcode = telNo.substring(0, 3);
			Integer p = providerMap.get(pcode);
			if(p == null){
				p = 0;
			}
			return p;
		}
		
	}
}

例項四、實現以下簡單演算法,其中mr程式涉及到了reduce分組等概念:
#當第一列相同時,求出第二列的最小值
3	3
3	2
3	1
2	2
2	1
1	1
----------結果---------
3	1
2	1
1	1

public class GroupApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/input";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	public static void main(String[] args) throws Exception{
		final Configuration configuration = new Configuration();
		
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
		if(fileSystem.exists(new Path(OUT_PATH))){
			fileSystem.delete(new Path(OUT_PATH), true);
		}
		
		final Job job = Job.getInstance(configuration, GroupApp.class.getSimpleName());
		
		//1.1 指定輸入檔案路徑
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定哪個類用來格式化輸入檔案
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2指定自定義的Mapper類
		job.setMapperClass(MyMapper.class);
		//指定輸出<k2,v2>的型別
		job.setMapOutputKeyClass(NewK2.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 指定分割槽類
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分割槽
		job.setGroupingComparatorClass(MyGroupingComparator.class);
		//1.5  TODO (可選)合併
		
		//2.2 指定自定義的reduce類
		job.setReducerClass(MyReducer.class);
		//指定輸出<k3,v3>的型別
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定輸出到哪裡
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		//設定輸出檔案的格式化類
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//把程式碼提交給JobTracker執行
		job.waitForCompletion(true);
	}

	
	static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>. throws java.io.IOException ,InterruptedException {
			final String[] splited = value.toString().split("\t");
			final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
			final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
			context.write(k2, v2);
		};
	}
	
	static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
		protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			long min = Long.MAX_VALUE;
			for (LongWritable v2 : v2s) {
				if(v2.get()<min){
					min = v2.get();
				}
			}
			
			context.write(new LongWritable(k2.first), new LongWritable(min));
		};
	}
	
	/**
	 * 問:為什麼實現該類?
	 * 答:因為原來的v2不能參與排序,把原來的k2和v2封裝到一個類中,作為新的k2
	 *
	 */
	static class  NewK2 implements WritableComparable<NewK2>{
		Long first;
		Long second;
		
		public NewK2(){}
		
		public NewK2(long first, long second){
			this.first = first;
			this.second = second;
		}
		
		
		public void readFields(DataInput in) throws IOException {
			this.first = in.readLong();
			this.second = in.readLong();
		}

		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}

		/**
		 * 當k2進行排序時,會呼叫該方法.
		 * 當第一列不同時,升序;當第一列相同時,第二列升序
		 */
		public int compareTo(NewK2 o) {
			final long minus = this.first - o.first;
			if(minus !=0){
				return (int)minus;
			}
			return (int)(this.second - o.second);
		}
		
		@Override
		public int hashCode() {
			return this.first.hashCode()+this.second.hashCode();
		}
		
		@Override
		public boolean equals(Object obj) {
			if(!(obj instanceof NewK2)){
				return false;
			}
			NewK2 oK2 = (NewK2)obj;
			return (this.first==oK2.first)&&(this.second==oK2.second);
		}
	}
	
	/**
	 * 問:為什麼自定義該類?
	 * 答:業務要求分組是按照第一列分組,但是NewK2的比較規則決定了不能按照第一列分。只能自定義分組比較器。
	 */
	static class MyGroupingComparator implements RawComparator<NewK2>{

		public int compare(NewK2 o1, NewK2 o2) {
			return (int)(o1.first - o2.first);
		}
		/**
		 * @param arg0 表示第一個參與比較的位元組陣列
		 * @param arg1 表示第一個參與比較的位元組陣列的起始位置
		 * @param arg2 表示第一個參與比較的位元組陣列的偏移量
		 * 
		 * @param arg3 表示第二個參與比較的位元組陣列
		 * @param arg4 表示第二個參與比較的位元組陣列的起始位置
		 * @param arg5 表示第二個參與比較的位元組陣列的偏移量
		 */
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
		}
		
	}
}

例項五:利用MapReduce求解海量資料檔案中的最大值,利用Mapper類中的cleanup()函式,因為cleanup()函式是在所有的map()完成之後才執行的。

public class TopKApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/input";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
	}
	static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
		long max = Long.MIN_VALUE;
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final long temp = Long.parseLong(v1.toString());
			if(temp>max){
				max = temp;
			}
		};
		
		protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		};
	}
	
	static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
		long max = Long.MIN_VALUE;
		protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
			final long temp = k2.get();
			if(temp>max){
				max = temp;
			}
		};
		
		protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		};
	}		
}

實力六、計數器:

public class WordCountApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/hello";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job =  Job.getInstance(conf , WordCountApp.class.getSimpleName());
		//1.1指定讀取的檔案位於哪裡
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何對輸入檔案進行格式化,把輸入檔案每一行解析成鍵值對
		//job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定義的map類
		job.setMapperClass(MyMapper.class);
		//map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
		//job.setMapOutputKeyClass(Text.class);
		//job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 分割槽
		//job.setPartitionerClass(HashPartitioner.class);
		//有一個reduce任務執行
		//job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分組
		
		//1.5 TODO 規約
		
		//2.2 指定自定義reduce類
		job.setReducerClass(MyReducer.class);
		//指定reduce的輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定寫出到哪裡
		FileOutputFormat.setOutputPath(job, outPath);
		//指定輸出檔案的格式化類
		//job.setOutputFormatClass(TextOutputFormat.class);
		
		//把job提交給JobTracker執行
		job.waitForCompletion(true);
	}
	
	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文字內容
	 * KEYOUT	即k2		表示行中出現的單詞
	 * VALUEOUT	即v2		表示行中出現的單詞的次數,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
			
			final String line = v1.toString();
			if(line.contains("hello")){
				//記錄敏感詞出現在一行中
				helloCounter.increment(1L);
			}
			final String[] splited = line.split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}
	
	/**
	 * KEYIN	即k2		表示行中出現的單詞
	 * VALUEIN	即v2		表示行中出現的單詞的次數
	 * KEYOUT	即k3		表示文字中出現的不同單詞
	 * VALUEOUT	即v3		表示文字中出現的不同單詞的總次數
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
			}
			ctx.write(k2, new LongWritable(times));
		};
	}
		
}