1. 程式人生 > >MapReduce實現join

MapReduce實現join

在我們平常的大資料專案開發和專案需求中,可能需要我們完成在關係型資料庫中十分常見的join類功能。那麼針對這種型別的功能需求,用hadoop中的MapReduce模型應該要怎麼實現呢?本篇文章將針對這種功能需求提供幾種實現選擇。

首先,我的開發環境為:jdk1.7hadoop2.6.4CentOS7

1. 利用DistributedCache實現Join

DistributedCache:這是Hadoop自帶的一個快取檔案功能,通過這個功能Hadoop可以將使用者指定的整個檔案拷貝分發到Job所執行的所有節點上,在各個節點上可以通過特定的介面訪問讀取這個快取的檔案。

在Hadoop中,join功能的實現可以發生在map端,也可以在reduce端實現,下面將分在map端和reduce端實現join來講解如何通過DistributedCache來實現Join。

1.1 實現map端join

場景:我們以部門、員工場景為例,部門和員工資訊分別存放在不同的兩個檔案中,檔案格式分別如下:

員工檔案內容如下:
員工號 員工生日 firstname lastname 性別 入職日期 所屬部門號
10001 1953-09-02 Georgi Facello M 1986-06-26 d005
10002 1964-06-02 Bezalel Simmel F 1985-11-21 d007
10003 1959-12-03 Parto Bamford M 1986-08-28 d004
10004 1954-05-01 Chirstian Koblick M 1986-12-01 d004
10005 1955-01-21 Kyoichi Maliniak M 1989-09-12 d003
10006 1953-04-20 Anneke Preusig F 1989-06-02 d005
10009 1952-04-19 Sumant Peac F 1985-02-18 d006

部門檔案內容如下:
部門號 部門名稱
d001 Marketing
d002 Finance
d003 Human Resources
d004 Production
d005 Development
d006 Quality Management
d007 Sales
d008 Research
d009 Customer Service

需要完成的功能:輸出員工資訊以及其所在部門的部門名稱。

分析:現在我們有兩個檔案需要輸入到MapReduce中,去進行Join操作,並且不打算用多個Mapper實現類來分別處理這兩個文字檔案,那麼在這種情況下,我們就可以使用DistributedCache這個功能,那麼我們應該將哪個檔案快取呢?小的那個,因為DistributedCache是將要整個檔案拷貝複製到各個節點上的,太大佔用的記憶體空間和網路傳輸的時間都將增大,所以建議將比較小的檔案作為DistributedCache快取檔案。我這裡是做測試,用到的檔案都是很小的檔案,我這裡指定部門檔案作為快取檔案。(如果要進行join的檔案都很大,那麼不建議使用DistributedCache功能實現join,可以選擇實現多個Mapper類來完成這個功能,這個下面將會講到)。那就具體的程式碼實現以及注意的地方有哪些呢?下面將在程式碼中指出。

Driver.java-mapreduce主程式類

package com.shell.join.mapsidejoin;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.shell.count.WordCount;

public class Driver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        // 輸入3個引數,分別指定:輸入檔案,輸出檔案目錄,以及要快取的檔案
        if (args.length != 3) {
            System.err.printf("Usage: %s [generic options] <input> <output> <cachefile>\n", WordCount.class.getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            System.exit(-1);
        }

        Job job = Job.getInstance();
        job.setJarByClass(getClass());
        job.setJobName("MapperSideJoin");

        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(MapperSideJoinDCacheTextFile.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 這裡reduce的任務數設定為0, 表示map任務完成之後,不再進行reduce將直接結束job
        // 根據具體的業務設定reduce任務數
        job.setNumReduceTasks(0);

        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 上面的配置跟一般的Job配置一樣的,沒啥區別
        // 這裡是關鍵,這裡指定了要DistributedCache快取的檔案的位置(注意這個檔案預設是hdfs協議訪問,
        // 所以建議放置在HDFS中),設定好這個檔案之後,在mapper或者reduce端就可以通過特定介面來訪問
        job.addCacheFile(new Path(args[2]).toUri());

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Driver(), args));
    }

}

MapperSileJoinDCacheTextFile.java-mapper實現類

package com.shell.join.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperSideJoinDCacheTextFile extends Mapper<LongWritable, Text, Text, Text> {
    private HashMap<String, String> departmentMap = new HashMap<>();
    // MapReduce中的Counter,這些設定的Counter根據使用情況將在任務執行完之後
    // 在控制檯中打印出來
    // 根據需要配置
    private enum MYCOUNTER {
        RECORD_COUNT,
        FILE_EXISTS,
        FILE_NOT_FOUND,
        SOME_OTHER_ERROR
    }

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        // 通過Job提供的介面方法,得到所有DistributedCache檔案在本地節點的存放路徑
        // 從這一點也可以知道DistributedCache檔案時放置在磁碟中,而不是記憶體裡面的
        // 根據這個路徑,就可以以本地檔案訪問的方式讀取這個DistributedCache的檔案
        Path[] cacheFiles = Job.getInstance(context.getConfiguration()).getLocalCacheFiles();
        for (Path cacheFile : cacheFiles) {
            System.out.println(cacheFile.toString());
            // 針對需要的快取檔案進行處理
            if (cacheFile.getName().toString().trim().equals("departments.txt")) {
                context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1); // Counter的運用
                loadDepartmentsHashMap(cacheFile, context);
            }
        }
    }
    // 將指定路徑的檔案內容讀取到map中
    private void loadDepartmentsHashMap(Path path, Context context) {
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new FileReader(path.toString()));
            String line = null;
            while((line = bufferedReader.readLine()) != null) {
                System.out.println(line);
                String[] departmentArray = line.split("\t");
                System.out.println(Arrays.toString(departmentArray));
                departmentMap.put(departmentArray[0].trim(), departmentArray[1].trim());
            }

        } catch (FileNotFoundException e) {
            e.printStackTrace();
            context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
        } catch (IOException e) {
            e.printStackTrace();
            context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
        } finally {
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (IOException e) {

                } finally {
                    bufferedReader = null;
                }
            }
        }
    }

    // 在map中完成join操作
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {

        context.getCounter(MYCOUNTER.RECORD_COUNT).increment(1);

        String employee = value.toString();
        String[] employeeArray = employee.split("\t");

        String deptId = employeeArray[6];
        System.out.println(departmentMap);
        String deptName = departmentMap.get(deptId);

        Text outputKey = new Text(employeeArray[0]);
        Text outputValue = new Text(employeeArray[1] + "\t"
                + employeeArray[2] + "\t"
                + employeeArray[3] + "\t"
                + employeeArray[4] + "\t"
                + employeeArray[5] + "\t"
                + employeeArray[6] + "\t"
                + deptName);
        context.write(outputKey, outputValue);
    }

}

1.2 實現Reduce端join

場景:現在有三個檔案,三個檔案分別存放內容如下。

UserDetails.txt
使用者操作號(唯一) 使用者名稱
9901972231,RevanthReddy
9986570643,Kumar
9980873232,Anjali
9964472219,Anusha
9980874545,Ravi
9664433221,Geetha
08563276311,Kesava
0863123456,Jim
080456123,Tom
040789123,Harry
020789456,Richa

DeliveryDetails.txt
操作號 操作結果碼
9901972231,001
9986570643,002
9980873232,003
9964472219,004
9980874545,001
9664433221,002
08563276311,003
0863123456,001
080456123,001
040789123,001
020789456,001

DeliveryStatusCodes.txt
結果碼 意義
001,Delivered
002,Pending
003,Failed
004,Resend

功能目標:根據這三個檔案,輸出每個使用者的操作結果,輸出內容如下。
使用者名稱 結果碼意義
RevanthReddy Delivered
Kumar Pending

分析:首先,我們這裡涉及多個檔案(在hive自定義UDF時,可以是多張表),那麼我們就要考慮是否適合使用DistributedCache來完成Join功能?這個怎麼做呢,我一般是通過檔案的大小,從三個檔案的內容結構上,我們很容易判斷,UserDetails和DeliveryDetails這兩個檔案的大小是在同一個量級上的,並且隨著時間的推移會變得很大,所以顯然不適合DistributedCache,而接著我們發現DeliveryStatusCodes這個檔案是解釋結果碼的意義的,大小使固定不變的,並且不會很大,甚至可以說會很小很小,所以在這樣的情況我們可以考慮使用DistributedCache快取來完成Join,將DeliveryStatusCodes作為快取檔案。那剩下的兩個檔案怎麼處理?這就可以涉及到MapReduce中的MultipleInputs這個介面,通過這個介面,我們可以實現多個Mapper類分別處理不同的輸入檔案。但這又會引出另一個問題,那就是reduce的實現定死只能有一個,這就意味著Mapper輸出結果中,key的型別必須一致(或者說繼承自同一個介面),所以有必要將不同Mapper的輸出key統一(這裡場景恰好可以通過操作號這個值來達到這個目的),並且要區分知道對應的value是屬於哪個檔案中的。順著這種思路,我有了下面的程式碼實現。

UserFileMapper.java-處理UserDetails.txt檔案的Mapper類

package com.shell.join.reducesidejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 這個map很簡單
public class UserFileMapper extends Mapper<LongWritable, Text, Text, Text> {
    private String cellNumber;
    private String userName;
    private String fileTag = "CD~";  // 通過這個標誌來標識輸出值是屬於哪個map,將在reduce中看到作用

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] splitArray = line.split(",");
        cellNumber = splitArray[0];
        userName = splitArray[1];

        context.write(new Text(cellNumber), new Text(fileTag + userName));
    }

}

DeliveryFileMapper.java-處理DeliveryDetails.txt檔案的Mapper類

package com.shell.join.reducesidejoin;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DeliverFileMapper extends Mapper<LongWritable, Text, Text, Text> {
    private String cellNumber;
    private String deliverCode;
    private String fileTag = "DR~";  // 跟UserFileMapper類中的作用一樣

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] splitArray = line.split(",");
        cellNumber = splitArray[0];
        deliverCode = splitArray[1];

        context.write(new Text(cellNumber), new Text(fileTag + deliverCode));
    }
}

SmsReducer.java-實現join的reduce類

package com.shell.join.reducesidejoin;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;

public class SmsReducer extends Reducer<Text, Text, Text, Text> {

    private HashMap<String, String> deliverCodesMap = new HashMap<>(); 
    private enum MYCOUNTER {
        RECORD_COUNT,
        FILE_EXISTS,
        FILE_NOT_FOUND,
        SOME_OTHER_ERROR
    }

    @Override
    protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        //這裡前面有提到過,得到DistributedCache檔案路徑
        Path[] cacheFiles = Job.getInstance(context.getConfiguration()).getLocalCacheFiles();

        for (Path cacheFile : cacheFiles) {
            if (cacheFile.getName().trim().equals("DeliveryStatusCodes.txt")) {
                context.getCounter(MYCOUNTER.FILE_EXISTS).increment(1);
                loadDeliverStatusCodes(cacheFile, context);
            }
        }
    }
    // 讀取快取檔案
    private void loadDeliverStatusCodes(Path cacheFile, Context context) {
        BufferedReader bufferedReader = null;

        try {
            bufferedReader = new BufferedReader(new FileReader(cacheFile.toString()));
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                String[] splitArray = line.split(",");
                deliverCodesMap.put(splitArray[0], splitArray[1]);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            context.getCounter(MYCOUNTER.FILE_NOT_FOUND).increment(1);
        } catch (IOException e) {
            e.printStackTrace();
            context.getCounter(MYCOUNTER.SOME_OTHER_ERROR).increment(1);
        } finally {
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    bufferedReader = null;
                }
            }
        }
    }

    // 達到reduce的記錄是這樣的格式:{[操作號,(使用者名稱,結果碼)],[],...}
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        String userName = null;
        String deliverReport = null;
        for (Text value : values) {
            String splitArray[] = value.toString().split("~");

            // 通過指定字首判斷這個值是來自哪個mapper,從而可以知道對應的值是什麼值(使用者名稱或者結果碼)
            if (splitArray[0].equals("CD")) {
                userName = splitArray[1];  // 獲取使用者名稱
            } else if (splitArray[0].equals("DR")) {
                deliverReport = deliverCodesMap.get(splitArray[1]); // 獲取結果碼對應的意義字串
            }
        }
        // 輸出結果
        if (userName != null && deliverReport != null) {
            context.write(new Text(userName), new Text(deliverReport));
        } else if (userName == null) {
            context.write(new Text("userName"), new Text(deliverReport));
        } else if (deliverReport == null) {
            context.write(new Text(userName), new Text("deliverReport"));
        }
    }
}

SmsDriver.java-MapReduce主程式

package com.shell.join.reducesidejoin;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SmsDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 4) {
            System.err.printf("Usage: %s [generic options] <input1> <input2> <output> <cachefile>\n", SmsDriver.class.getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            System.exit(-1);
        }

        Job job = Job.getInstance();
        job.setJarByClass(getClass());
        job.setJobName("SmsDriver");

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, DeliverFileMapper.class);

        job.setReducerClass(SmsReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.addCacheFile(new Path(args[3]).toUri());

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new SmsDriver(), args));
    }

}

小結:通過上述兩個例子程式,我們知道了DistributedCache的使用,以及在什麼情況下我們可以使用這個功能來幫助我們,同時我們也知道了通過MapReduce如何來完成Join功能。

2. 通過MultipleInputs以及多個Mapper完成Join

可能我們平常做得比較多的就是通過一個Mapper實現類來讀取一個檔案,然後處理,再Reducer這樣子的一個過程,而比較忽略MultipleInputs這個介面。(至少我是這樣,因為我沒怎麼弄過Hive、Hbase,這個功能在Hive的UDF中可能用的比較多)。通過MultipleInputs這個介面,在我們的MapReduce中,可以實現多個Mapper類,並且為每個類指定特定的輸入檔案目錄和輸入檔案格式等。下面我要講的就是如何通過多個Mapper實現類的方式來完成多個輸入檔案的Join功能。

場景:假設我現在有兩個檔案(並且都很大),兩個檔案的內容分別如下。

user.txt(記錄唯一)
user_id(唯一) location_id
3241 1
3321 65
4532 13
7231 32
5321 34
8321 84
1342 21
3213 23
2134 9
2345 45
3423 36
7623 98
2346 87
2133 87

transaction.txt
transaction_id product_id user_id quantity amount
10 100 3241 2 200
11 101 3321 2 200
12 102 4532 2 200
13 100 7231 2 200
14 105 5321 2 200
15 107 8321 2 200
16 200 1342 2 200
17 109 3213 2 200
18 102 2134 2 200
19 106 2345 2 200
20 108 3423 2 200
21 200 7623 2 200
22 110 2346 2 200
23 100 2133 2 200
24 135 8773 2 200
25 201 8723 2 299
25 107 8724 2 287
26 103 3876 2 150

功能需求:根據這兩個檔案,輸出每個使用者購買產品所送達的目的地,輸出格式如下。

user_id product_id location_id
3241 100 1
3321 101 65
….
3876 103 undefined(沒有使用者資訊時,不知道location_id情況下)

類似於關係型資料庫中left join之類的功能。

分析:從場景描述中,也已經知道這兩個檔案很大,所以我們可以摒棄DistributedCache這樣的方式。那麼我們將直接選擇通過MultipleInputs方式來輸入檔案,並實現多個Mapper類。那麼用這種實現方式伴隨而來的問題是什麼呢?那就是多個Mapper的輸出key統一問題;因為我們這裡是不同的Mapper處理不同的檔案,那麼就意味著不可能在mapper端完成Join,而只能選擇在Reducer端完成join,在reduce中完成join其實會有一個很大的問題需要解決,因為mapper的輸出結果,我要怎麼樣設定才可以使得需要join的記錄出現在同一個處理節點上面,否則join將不完全。換句話說就是怎樣設計Partitioner(也可以不用自己實現Partitioner,只要我們把需要join的Mapper結果的key統一之後,預設的hashPartitioner也一樣會把這些記錄shuffle到同一個reduce節點上面)。所以,最終問題就落在如何查詢兩個檔案中的公有屬性列(對應到關係型資料庫中,就是找到join列)。這樣問題就變得簡單很多了。很明顯,這裡我們可以通過user_id這個列來統一mapper的輸出key。

程式碼實現:

LeftJoinTransaction.java-處理transaction.txt檔案

package com.shell.dataalgorithms.mapreduce.chap04;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import edu.umd.cloud9.io.pair.PairOfStrings; // 這個類需要引入cloud9lib這個jar包

/**
 * input:
 *  <transaction_id><TAB><product_id><TAB><user_id><TAB><quantity><TAB><amount>
 * @author Administrator
 *
 */
public class LeftJoinTransactionMapper extends Mapper<LongWritable, Text, PairOfStrings, PairOfStrings> {

    PairOfStrings outputKey = new PairOfStrings();
    PairOfStrings outputValue = new PairOfStrings();

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)
            throws IOException, InterruptedException {

        String[] tokens = value.toString().split("\t");
        String productId = tokens[1];
        String userId = tokens[2];

        outputKey.set(userId, "2");
        outputValue.set("P", productId);

        context.write(outputKey, outputValue);
    }

}

LeftJoinUserMapper.java-處理user.txt檔案的Mapper

package com.shell.dataalgorithms.mapreduce.chap04;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import edu.umd.cloud9.io.pair.PairOfStrings;

/**
 * input:
 *  <user_id><TAB><location_id>
 * @author Administrator
 *
 */
public class LeftJoinUserMapper extends Mapper<LongWritable, Text, PairOfStrings, PairOfStrings> {

    PairOfStrings outputKey = new PairOfStrings();
    PairOfStrings outputValue = new PairOfStrings();

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, PairOfStrings, PairOfStrings>.Context context)
            throws IOException, InterruptedException {

        String[] tokens = value.toString().split("\t");

        if (tokens.length == 2) {
            outputKey.set(tokens[0], "1");
            outputValue.set("L", tokens[1]);
            context.write(outputKey, outputValue);
        }

    }

}

LeftJoinReducer.java-Reducer實現

package com.shell.dataalgorithms.mapreduce.chap04;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import edu.umd.cloud9.io.pair.PairOfStrings;
/**
* 注意這裡reducer跟上述兩個例子中的不同,因為這裡transaction檔案中user_id是可以重複多次出現的,所以
* 這裡reducer接收的記錄將會是這樣子的 [user_id,(location_id, product_id1,product_id2,...,product_idn)],...,[..]
**/
public class LeftJoinReducer extends Reducer<PairOfStrings, PairOfStrings, Text, Text> {


    @Override
    protected void reduce(PairOfStrings key, Iterable<PairOfStrings> values,
            Reducer<PairOfStrings, PairOfStrings, Text, Text>.Context context) throws IOException, InterruptedException {

        Text productId = new Text();
        Text locationId = new Text("undefined");
        Iterator<PairOfStrings> iterator = values.iterator();
        if (iterator.hasNext()) {
            // 這裡沒再單純借用flag標誌來區分記錄值來之哪個mapper
            // 而是通過記錄排序方式以及flag來區分記錄值來自哪個mapper
            PairOfStrings firstPair = iterator.next();
            System.out.println("firstPair=" + firstPair);
            if (firstPair.getLeftElement().equals("L")) {
                locationId.set(firstPair.getRightElement());
            } else {
                context.write(new Text(firstPair.getRightElement()), locationId);
            }
        }

        while (iterator.hasNext()) {
            PairOfStrings productPair = iterator.next();
            System.out.println("productPair=" + productPair);
            productId.set(productPair.getRightElement());
            context.write(productId, locationId);
        }
    }

}

LeftJoinDriver.java

package com.shell.dataalgorithms.mapreduce.chap04;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import edu.umd.cloud9.io.pair.PairOfStrings;

public class LeftJoinDriver {

    public static void main(String[] args) throws Exception {
        Path transactions = new Path(args[0]);  // input
        Path users = new Path(args[1]); // input
        Path output = new Path(args[2]); // output

        Job job = Job.getInstance();
        job.setJarByClass(LeftJoinDriver.class);
        job.setJobName(LeftJoinDriver.class.getSimpleName());

        job.setPartitionerClass(SecondarySortPartitioner.class);

        job.setGroupingComparatorClass(SecondarySortGroupComparator.class);

        job.setReducerClass(LeftJoinReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
//      job.setOutputFormatClass(SequenceFileOutputFormat.class);

        MultipleInputs.addInputPath(job, transactions, TextInputFormat.class, LeftJoinTransactionMapper.class);
        MultipleInputs.addInputPath(job, users, TextInputFormat.class, LeftJoinUserMapper.class);

        job.setMapOutputKeyClass(PairOfStrings.class);
        job.setMapOutputValueClass(PairOfStrings.class);
        FileOutputFormat.setOutputPath(job, output);

        job.waitForCompletion(true);
    }
}

SecondarySortPartitioner.java

package com.shell.dataalgorithms.mapreduce.chap04;

import org.apache.hadoop.mapreduce.Partitioner;

import edu.umd.cloud9.io.pair.PairOfStrings;

public class SecondarySortPartitioner extends Partitioner<PairOfStrings, Object> {

    @Override
    public int getPartition(PairOfStrings key, Object value, int numPartitions) {
        return (key.getLeftElement().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

}

SecondarySortGroupComparator.java

package com.shell.dataalgorithms.mapreduce.chap04;

import java.io.IOException;

import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;

import edu.umd.cloud9.io.pair.PairOfStrings;

public class SecondarySortGroupComparator implements RawComparator<PairOfStrings> {

    @Override
    public int compare(PairOfStrings o1, PairOfStrings o2) {
        return o1.getLeftElement().compareTo(o2.getLeftElement());
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        DataInputBuffer buffer = new DataInputBuffer();
        PairOfStrings a = new PairOfStrings();
        PairOfStrings b = new PairOfStrings();

        try {
            buffer.reset(b1, s1, l1);
            a.readFields(buffer);
            buffer.reset(b2, s2, l2);
            b.readFields(buffer);

            return compare(a, b);
        } catch (IOException e) {
            return -1;
        }
    }


}