1. 程式人生 > >大資料之使用hadoop對海量資料進行統計並排序

大資料之使用hadoop對海量資料進行統計並排序

不得不說,Hadoop確實是處理海量離線資料的利器,當然,凡是一個東西有優點必定也有缺點,hadoop的缺點也很多,比如對流式計 算,實時計算,DAG具有依賴關係的計算,支援都不友好,所以,由此誕生了很多新的分散式計算框 架,Storm,Spark,Tez,impala,drill,等等,他們都是針對特定問題提出一種解決方案,新框架的的興起,並不意味者他們就可以替 代hadoop,一手獨大,HDFS和MapReduce依舊是很優秀的,特別是對離線海量資料的處理。

hadoop在如下的幾種應用場景裡,用的還是非常廣泛的,1,搜尋引擎建索引,2,topK熱關鍵詞統計,3,海量日誌的資料分析等等。
散仙,今天的這個例子的場景要對幾億的單詞或短語做統計和並按詞頻排序,當然這些需求都是類似WordCount問題,如果你把Hadoop 自帶的WordCount的例子,給搞懂了,基本上做一些IP,熱詞的統計與分析就很很容易了,WordCount問題,確實是一個非常具有代表性的例 子。

下面進入正題,先來分析下散仙這個例子的需求,總共需要二步來完成,第一步就是對短語的統計,第二步就是對結果集的排序。所 以如果使用MapReduce來完成的話,就得需要2個作業來完成這件事情,第一個作業來統計詞頻,第二個來負責進行排序,當然這兩者之間是有依賴關係 的,第二個作業的執行,需要依賴第一個作業的結果,這就是典型的M,R,R的問題並且作業之間具有依賴關係,這種問題使用MapReduce來完成,效率 可能有點低,如果使用支援DAG作業的Tez來做這件事情,那麼就很簡單了。不過本篇散仙,要演示的例子還是基於MapReduce來完成的,有興趣的朋 友,可以研究一下使用Tez。

對於第一個作業,我們只需要改寫wordcount的例子,即可,因為散仙的需求裡面涉及短語的統計,所以定義的格式為,短語和短語之間使用分號隔開,(預設的格式是按單詞統計的,以空格為分割符)在map時只需要,按分號打散成陣列,進行處理即可,測試的資料內容如下:



map裡面的核心程式碼如下:

  /**
     * 統計詞頻的map端
     * 程式碼
     * 
     * **/
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

    	String [] data=value.toString().split(";");//按每行的分號拆分短語
    	for(String s:data){
    		if(s.trim().length()>0){//忽略空字元
    		word.set(s);
    		context.write(word, one);
    		}
    	}

    }

reduce端的核心程式碼如下:

/**
     * reduce端的
     * 程式碼
     * **/
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();//累加詞頻
      }
      result.set(sum);
      context.write(new Text(key.toString()+"::"), result);//為方便短語排序,以雙冒號分隔符間隔
    }
  }

main函式裡面的程式碼如下:

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

執行結果,如下所示:

a good student::	1
good student::	3
patient::	2
patient a::	1

下面,散仙來分析下排序作業的程式碼,如上圖所示hadoop預設的排序,是基於key排序的,如果是字元型別的則基於字典表 排序,如果是數值型別的則基於數字大小排序,兩種方式都是按預設的升序排列的,如果想要降序輸出,就需要我們自己寫個排序元件了,散仙會在下面的程式碼給出 例子,因為我們是要基於詞頻排序的,所以需要反轉K,V來實現對詞頻的排序,map端程式碼如下:

/**
		 * 排序作業
		 * map的實現
		 * 
		 * **/
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String s[]=value.toString().split("::");//按兩個冒號拆分每行資料
			word.set(s[0]);//
			one.set(Integer.parseInt(s[1].trim()));//
			context.write(one, word);//注意,此部分,需要反轉K,V順序
		}

reduce端程式碼如下:

/***
  * 
  * 排序作業的
  * reduce程式碼
  * **/		
		@Override
		protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			for(Text t:arg1){
				result.set(t.toString());
				 arg2.write(result, arg0);
			}
		}

下面,我們再來看下排序元件的程式碼:

/***
 * 按詞頻降序排序
 * 的類
 * 
 * **/
	public static class DescSort extends  WritableComparator{

		 public DescSort() {
			 super(IntWritable.class,true);//註冊排序元件
		}
		 @Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用負號來完成降序
		}

		 @Override
		public int compare(Object a, Object b) {

			return   -super.compare(a, b);//注意使用負號來完成降序
		}

	}

main方法裡面的實現程式碼如下所示:

public static void main(String[] args) throws Exception{

		  Configuration conf = new Configuration();
		    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    if (otherArgs.length != 2) {
		        System.err.println("Usage: wordcount <in> <out>");
		        System.exit(2);
		      }
		   Job job=new Job(conf, "sort");
		   job.setOutputKeyClass(IntWritable.class);
		   job.setOutputValueClass(Text.class);
		   job.setMapperClass(SortIntValueMapper.class);
		   job.setReducerClass(SortIntValueReducer.class)	;
		   job.setSortComparatorClass(DescSort.class);//加入排序元件
		   job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
		   job.setOutputFormatClass(TextOutputFormat.class);
		   FileInputFormat.setInputPaths(job, new Path(args[0]));
		   FileOutputFormat.setOutputPath(job, new Path(args[1]));

		   System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

輸出結果,如下所示:

good student	3
patient	2
a good student	1
patient a	1

至此,我們可以成功實現,統計並排序的業務,當然這種型別的需求非常多而且常見,如對某個海量日誌IP的分析,散仙上面的例 子使用的只是測試的資料,而真實資料是對幾億或幾十億的短語構建語料庫使用,配置叢集方面,可以根據自己的需求,配置叢集的節點個數以及 map,reduce的個數,而程式碼,只需要我們寫好,提交給hadoop叢集執行即可。

最後在簡單總結一下,資料處理過程中,格式是需要提前定製好的,也就是說你得很清楚的你的格式代表什麼意思,另外一點,關於 hadoop的中文編碼問題,這個是內部固定的UTF-8格式,如果你是GBK的檔案編碼,則需要自己單獨在map或reduce過程中處理一下,否則輸 出的結果可能是亂碼,最好的方法就是統一成UTF-8格式,否則,很容易出現一些編碼問題的。