1. 程式人生 > 其它 >03_MapReduce框架原理_3.9 合併 Combiner(Map端合併)

03_MapReduce框架原理_3.9 合併 Combiner(Map端合併)

1. 說明

2. 指定 合併器

  // 指定 合併器
  public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    // 檢測 指定的Combiner類 必須是Reducer 的子類
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }

3. 案例

package
CombinerPk { import java.lang import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} // Mapper 類 class WCComMapper extends Mapper[LongWritable, Text, Text, IntWritable] { var text = new Text var intWritable = new IntWritable(1) // 每行記錄呼叫一次map方法 override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = { println(
"map enter .....") //1. 獲取一行記錄 val line = value.toString //2. 切割 val words = line.split(" ") //3. 輸出到緩衝區 words.foreach( key1 => { text.set(key1); context.write(text, intWritable) } ) } } // Reducer 類 class WCComReducer extends Reducer[Text, IntWritable, Text, IntWritable] { private val intWritable = new IntWritable // 每個key呼叫一次 override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { var sum: Int = 0 // 1. 對詞頻數 求sum values.forEach(sum += _.get) // 2. 輸出結果 intWritable.set(sum) context.write(key, intWritable) } } // 自定義Combiner class WCCombiner extends Reducer[Text, IntWritable, Text, IntWritable] { private val intWritable = new IntWritable // 每個key呼叫一次 override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { var sum: Int = 0 // 1. 對詞頻數 求sum values.forEach(sum += _.get) // 2. 輸出結果 intWritable.set(sum) context.write(key, intWritable) } } // Driver object Driver { def main(args: Array[String]): Unit = { //1. 獲取配置資訊以及 獲取job物件 //讀取配置檔案 Configuration: core-default.xml, core-site.xml var configuration = new Configuration var job: Job = Job.getInstance(configuration) //2. 註冊本Driver程式的jar job.setJarByClass(this.getClass) job.setJobName("scala mr") //3. 註冊 Mapper 和 Reducer的jar job.setMapperClass(classOf[WCComMapper]) job.setReducerClass(classOf[WCComReducer]) //4. 設定Mapper 類輸出key-value 資料型別 job.setMapOutputKeyClass(classOf[Text]) job.setMapOutputValueClass(classOf[IntWritable]) //5. 設定最終輸出key-value 資料型別 job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]) //6. 設定輸入輸出路徑 FileInputFormat.setInputPaths(job, "src/main/data/input/1.txt") FileOutputFormat.setOutputPath(job, new Path("src/main/data/output")) //7. 指定分割槽器 job.setCombinerClass(classOf[WCCombiner]) //8. 提交job val bool: Boolean = job.waitForCompletion(true) System.exit(bool match { case true => "0".toInt case false => "1".toInt }) } } }