package icu.shaoyayu.hadoop;

import icu.shaoyayu.hadoop.map.MyMapper;
import icu.shaoyayu.hadoop.reduce.MyReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

 * @author shaoyayu
 * 計算出每個時間段出現的行為次數最多的
public class App {
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = new Configuration(true);
        Job job = Job.getInstance(config);

        Path inputPath = new Path("/user/root/user/mgs/tianmao/tianchi_mobile_recommend_train_user.csv");

        Path outputPath = new Path("/user/root/user/mgs/outputTianMao");
        if (outputPath.getFileSystem(config).exists(outputPath)){





package icu.shaoyayu.hadoop.map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

 * @author 邵涯語
 * @date 2020/4/10 22:26
 * @Version :
 * <KEYIN, VALUEIN, 輸入型別相關的 在每一行的split的到的資料型別有關
 * KEYOUT, VALUEOUT>  輸出給Reduce的資料型別
public class MyMapper extends Mapper<Object, Text, IntWritable, Text> {

    private Text word = new Text();

     * map方法會被多次呼叫
     * @param key   字串的偏移量,
     * @param value 行的資料
     * @param context   上下文
     * @throws IOException
     * @throws InterruptedException
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        if (split.length!=6){
        String[] times = split[split.length-1].split(" ");
        if (times.length!=2){
        IntWritable time = new IntWritable(Integer.valueOf(times[1]));
        context.write(time, word);



package icu.shaoyayu.hadoop.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

 * @author 邵涯語
 * @date 2020/4/10 22:27
 * @Version :
 * <Text, IntWritable, 這個地方的輸入來自map階段的輸出
 * Text, IntWritable>   自定義的輸出型別
public class MyReducer extends Reducer<IntWritable, Text, IntWritable, IntWritable> {

    private IntWritable result = new IntWritable();

    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (Text val : values) {
            sum = sum+1;
        context.write(key, result);



hadoop jar [jar報名] [入口程式包名.類名]


package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;

 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * <p>Maps are the individual tasks which transform input records into a 
 * intermediate records. The transformed intermediate records need not be of 
 * the same type as the input records. A given input pair may map to zero or 
 * many output pairs.</p> 
 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
 * {@link InputSplit} generated by the {@link InputFormat} for the job.
 * <code>Mapper</code> implementations can access the {@link Configuration} for 
 * the job via the {@link JobContext#getConfiguration()}.
 * <p>The framework first calls 
 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
 * for each key/value pair in the <code>InputSplit</code>. Finally 
 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
 * <p>All intermediate values associated with a given output key are 
 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
 * determine the final output. Users can control the sorting and grouping by 
 * specifying two key {@link RawComparator} classes.</p>
 * <p>The <code>Mapper</code> outputs are partitioned per 
 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
 * <p>Users can optionally specify a <code>combiner</code>, via 
 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
 * intermediate outputs, which helps to cut down the amount of data transferred 
 * from the <code>Mapper</code> to the <code>Reducer</code>.
 * <p>Applications can specify if and how the intermediate
 * outputs are to be compressed and which {@link CompressionCodec}s are to be
 * used via the <code>Configuration</code>.</p>
 * <p>If the job has zero
 * reduces then the output of the <code>Mapper</code> is directly written
 * to the {@link OutputFormat} without sorting by keys.</p>
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class TokenCounterMapper 
 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.write(word, one);
 *     }
 *   }
 * }
 * </pre></blockquote>
 * <p>Applications may override the
 * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
 * etc.</p>
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

   * The <code>Context</code> passed on to the {@link Mapper} implementations.
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
   * Called once at the beginning of the task.
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING

   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);

   * Called once at the end of the task.
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
  public void run(Context context) throws IOException, InterruptedException {
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    } finally {


package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

import java.util.Iterator;

 * Reduces a set of intermediate values which share a key to a smaller set of
 * values.  
 * <p><code>Reducer</code> implementations 
 * can access the {@link Configuration} for the job via the 
 * {@link JobContext#getConfiguration()} method.</p>

 * <p><code>Reducer</code> has 3 primary phases:</p>
 * <ol>
 *   <li>
 *   <b id="Shuffle">Shuffle</b>
 *   <p>The <code>Reducer</code> copies the sorted output from each 
 *   {@link Mapper} using HTTP across the network.</p>
 *   </li>
 *   <li>
 *   <b id="Sort">Sort</b>
 *   <p>The framework merge sorts <code>Reducer</code> inputs by 
 *   <code>key</code>s 
 *   (since different <code>Mapper</code>s may have output the same key).</p>
 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
 *   being fetched they are merged.</p>
 *   <b id="SecondarySort">SecondarySort</b>
 *   <p>To achieve a secondary sort on the values returned by the value 
 *   iterator, the application should extend the key with the secondary
 *   key and define a grouping comparator. The keys will be sorted using the
 *   entire key, but will be grouped using the grouping comparator to decide
 *   which keys and values are sent in the same call to reduce.The grouping 
 *   comparator is specified via 
 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
 *   controlled by 
 *   {@link Job#setSortComparatorClass(Class)}.</p>
 *   For example, say that you want to find duplicate web pages and tag them 
 *   all with the url of the "best" known example. You would set up the job 
 *   like:
 *   <ul>
 *     <li>Map Input Key: url</li>
 *     <li>Map Input Value: document</li>
 *     <li>Map Output Key: document checksum, url pagerank</li>
 *     <li>Map Output Value: url</li>
 *     <li>Partitioner: by checksum</li>
 *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
 *     <li>OutputValueGroupingComparator: by checksum</li>
 *   </ul>
 *   </li>
 *   <li>   
 *   <b id="Reduce">Reduce</b>
 *   <p>In this phase the 
 *   {@link #reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
 *   method is called for each <code>&lt;key, (collection of values)&gt;</code> in
 *   the sorted inputs.</p>
 *   <p>The output of the reduce task is typically written to a 
 *   {@link RecordWriter} via 
 *   {@link Context#write(Object, Object)}.</p>
 *   </li>
 * </ol>
 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
 * <p>Example:</p>
 * <p><blockquote><pre>
 * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
 *                                                 Key,IntWritable&gt; {
 *   private IntWritable result = new IntWritable();
 *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
 *                      Context context) throws IOException, InterruptedException {
 *     int sum = 0;
 *     for (IntWritable val : values) {
 *       sum += val.get();
 *     }
 *     result.set(sum);
 *     context.write(key, result);
 *   }
 * }
 * </pre></blockquote>
 * @see Mapper
 * @see Partitioner

   * The <code>Context</code> passed on to the {@link Reducer} implementations.
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

   * Called once at the start of the task.
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING

   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);

   * Called once at the end of the task.
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING

   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
  public void run(Context context) throws IOException, InterruptedException {
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
    } finally {