1. 程式人生 > >Hadoop—MapReduce進行資料查詢和實現推簡單薦系統---練習7

Hadoop—MapReduce進行資料查詢和實現推簡單薦系統---練習7

1 執行環境說明

1.1  硬軟體環境

l  主機作業系統:Windows 64 bit,雙核4執行緒,主頻2.2G,6G記憶體

l  虛擬軟體:VMware® Workstation 9.0.0 build-812388

l  虛擬機器作業系統:CentOS 64位,單核,1G記憶體

l  JDK:1.7.0_55 64 bit

l  Hadoop:1.1.2

1.2  機器網路環境

叢集包含三個節點:1個namenode、2個datanode,其中節點之間可以相互ping通。節點IP地址和主機名分佈如下:

序號

IP地址

機器名

型別

使用者名稱

執行程序

1

10.88.147.221

hadoop1

名稱節點

hadoop

NN、SNN、JobTracer

2

10.88.147.222

hadoop2

資料節點

hadoop

DN、TaskTracer

3

10.88.147.223

hadoop3

資料節點

hadoop

DN、TaskTracer

所有節點均是CentOS6.5 64bit系統,防火牆均禁用,所有節點上均建立了一個hadoop使用者,使用者主目錄是/usr/hadoop。所有節點上均建立了一個目錄/usr/local/hadoop,並且擁有者是hadoop使用者。

2 書面作業1:計算員工相關

2.1
  書面作業1內容

(本題10選2)把作業素材demo.txt中的兩個表資料用適當的方式匯入hadoop(來自Oracle資料庫的樣板表,可考慮分成2個檔案存放,注意空值的處理)

書寫Map-Reduce程式,求出以下結果

1) 求各個部門的總工資

2) 求各個部門的人數和平均工資

3) 求每個部門最早進入公司的員工姓名

4) 求各個城市的員工的總工資

5) 列出工資比上司高的員工姓名及其工資

6) 列出工資比公司平均工資要高的員工姓名及其工資

7) 列出名字以J開頭的員工姓名及其所屬部門名稱

8) 列出工資最高的頭三名員工姓名及其工資

9) 將全體員工按照總收入(工資+提成)從高到低排列,要求列出姓名及其總收入

10) 如果每位員工只能和他的直接上司,直接下屬,同一部門的同事交流,求任何兩名員工之間若要進行資訊傳遞所需要經過的中間節點數。請評價一下這個問題是否適合使用map-reduce解決

2.2  實現過程

2.2.1準備測試資料

2.2.1.1拆分檔案

把提供的測試資料第7-8周作業素材.txt按照要求拆分成兩個檔案dept(部門)和emp(員工),其中各欄位用逗號分隔:

dept檔案內容:

10,ACCOUNTING,NEW YORK

20,RESEARCH,DALLAS

30,SALES,CHICAGO

40,OPERATIONS,BOSTON

emp檔案內容:

7369,SMITH,CLERK,7902,17-12月-80,800,,20

7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30

7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30

7566,JONES,MANAGER,7839,02-4月-81,2975,,20

7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30

7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30

7782,CLARK,MANAGER,7839,09-6月-81,2450,,10

7839,KING,PRESIDENT,,17-11月-81,5000,,10

7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30

7900,JAMES,CLERK,7698,03-12月-81,950,,30

7902,FORD,ANALYST,7566,03-12月-81,3000,,20

7934,MILLER,CLERK,7782,23-1月-82,1300,,10

2.2.1.2上傳測試檔案

使用SSH工具(參見第1、2周2.1.3.1Linux檔案傳輸工具所描述)把dept和emp兩個檔案上傳到本地目錄/usr/local/hadoop-1.1.2/input中,然後使用eclipse的HDFS外掛工具上傳該檔案到/usr/hadoop/in目錄中,如下圖所示:

clip_image002

2.2.2問題1:求各個部門的總工資

2.2.2.1問題分析

MapReduce中的join分為好幾種,比如有最常見的 reduce side join、map side join和semi join 等。reduce join 在shuffle階段要進行大量的資料傳輸,會造成大量的網路IO效率低下,而map side join 在處理多個小表關聯大表時非常有用 。

Map side join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。為了支援檔案的複製,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

(1)使用者使用靜態方法DistributedCache.addCacheFile()指定要複製的檔案,它的引數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的檔案拷貝到各個TaskTracker的本地磁碟上。

(2)使用者使用DistributedCache.getLocalCacheFiles()方法獲取檔案目錄,並使用標準的檔案讀寫API讀取相應的檔案。

在下面程式碼中,將會把資料量小的表(部門dept )快取在記憶體中,在Mapper階段對員工部門編號對映成部門名稱,該名稱作為key輸出到Reduce中,在Reduce中計算按照部門計算各個部門的總工資。

2.2.2.2處理流程圖

clip_image004

2.2.2.3編寫程式碼

Q1SumDeptSalary.java程式碼:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q1SumDeptSalary extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用於快取 dept檔案中的資料
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法會在Map方法執行之前執行且執行一次
        @Override
        protectedvoid setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
 
                // 從當前作業中獲取要快取的檔案
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
 
                    // 對部門檔案欄位進行拆分並快取到deptMap中
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {
                           
                            // 對部門檔案欄位進行拆分並快取到deptMap中
                            // 其中Map中key為部門編號,value為所在部門名稱
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
 
publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 
            // 對員工檔案欄位進行拆分
            kv = value.toString().split(",");
 
            // map join: 在map階段過濾掉不需要的資料,輸出key為部門名稱和value為員工工資
            if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }
 
    publicstaticclass Reduce extends Reducer<Text, Text, Text, LongWritable> {
 
publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
            // 對同一部門的員工工資進行求和
            long sumSalary = 0;
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
            }
 
            // 輸出key為部門名稱和value為該部門員工工資總和
            context.write(key, new LongWritable(sumSalary));
        }
    }
 
    @Override
    publicint run(String[] args) throws Exception {
 
        // 例項化作業物件,設定作業名稱、Mapper和Reduce類
        Job job = new Job(getConf(), "Q1SumDeptSalary");
        job.setJobName("Q1SumDeptSalary");
        job.setJarByClass(Q1SumDeptSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 設定輸入格式類
        job.setInputFormatClass(TextInputFormat.class);
 
        // 設定輸出格式
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1個引數為快取的部門資料路徑、第2個引數為員工資料路徑和第3個引數為輸出路徑
       String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
       DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
 
        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,執行入口
     * @param args 輸入引數
     */
    publicstaticvoid main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
        System.exit(res);
    }
}


2.2.2.4配置執行引數

新建一個Java應用執行程式,需要在Arguments頁籤填寫Q1SumDeptSalary執行的部門資料路徑、員工資料路徑和輸出路徑三個引數,需要注意的是hdfs的路徑引數路徑需要全路徑,否則執行會報錯:

l  部門資料路徑:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部門資料將快取在各執行任務的節點內容中,可以提供處理的效率

l  員工資料路徑:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  輸出路徑:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q1

clip_image006

2.2.2.5執行並檢視結果

設定執行引數完畢後,點選執行按鈕:

clip_image008

執行成功後,重新整理CentOS HDFS中的輸出路徑/usr/hadoop/out/week7_q1目錄,開啟part-r-00000檔案,可以看到執行結果:

ACCOUNTING8750

RESEARCH6775

SALES  9400

clip_image010

2.2.3問題2:求各個部門的人數和平均工資

2.2.3.1問題分析

求各個部門的人數和平均工資,需要得到各部門工資總數和部門人數,通過兩者相除獲取各部門平均工資。首先和問題1類似在Mapper的Setup階段快取部門資料,然後在Mapper階段抽取出部門編號和員工工資,利用快取部門資料把部門編號對應為部門名稱,接著在Shuffle階段把傳過來的資料處理為部門名稱對應該部門所有員工工資的列表,最後在Reduce中按照部門歸組,遍歷部門所有員工,求出總數和員工數,輸出部門名稱和平均工資。

2.2.3.2處理流程圖

clip_image012

2.2.3.3編寫程式碼

Q2DeptNumberAveSalary.java程式碼:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q2DeptNumberAveSalary extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用於快取 dept檔案中的資料
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法會在Map方法執行之前執行且執行一次
        @Override
        protectedvoid setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 從當前作業中獲取要快取的檔案
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
 
                    // 對部門檔案欄位進行拆分並快取到deptMap中
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {
                           
                            // 對部門檔案欄位進行拆分並快取到deptMap中
                            // 其中Map中key為部門編號,value為所在部門名稱
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
 
        publicvoid map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 
            // 對員工檔案欄位進行拆分
            kv = value.toString().split(",");
 
            // map join: 在map階段過濾掉不需要的資料,輸出key為部門名稱和value為員工工資
            if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }
 
    publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {
 
        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
            long sumSalary = 0;
            int deptNumber = 0;
 
            // 對同一部門的員工工資進行求和
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
                deptNumber++;
            }
 
            // 輸出key為部門名稱和value為該部門員工工資平均值
            context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
        }
    }
 
    @Override
    publicint run(String[] args) throws Exception {
 
        // 例項化作業物件,設定作業名稱、Mapper和Reduce類
        Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
        job.setJobName("Q2DeptNumberAveSalary");
        job.setJarByClass(Q2DeptNumberAveSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 設定輸入格式類
        job.setInputFormatClass(TextInputFormat.class);
 
        // 設定輸出格式類
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1個引數為快取的部門資料路徑、第2個引數為員工資料路徑和第3個引數為輸出路徑
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
 
        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,執行入口
     * @param args 輸入引數
     */
    publicstaticvoid main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
        System.exit(res);
    }
}


2.2.3.4配置執行引數

新建一個Java應用執行程式,需要在Arguments頁籤填寫Q2DeptNumberAveSalary執行的部門資料路徑、員工資料路徑和輸出路徑三個引數,需要注意的是hdfs的路徑引數路徑需要全路徑,否則執行會報錯:

l  部門資料路徑:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部門資料將快取在各執行任務的節點內容中,可以提供處理的效率

l  員工資料路徑:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  輸出路徑:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q2

clip_image014

2.2.3.5執行並檢視結果

設定執行引數完畢後,點選執行按鈕:

clip_image016

執行成功後,重新整理CentOS HDFS中的輸出路徑/usr/hadoop/out/week7_q2目錄,開啟part-r-00000檔案,可以看到執行結果:

ACCOUNTINGDept Number:3,Ave Salary:2916

RESEARCHDept Number:3,Ave Salary:2258

SALES  Dept Number:6,Ave Salary:1566

clip_image018

2.2.4問題3:求每個部門最早進入公司的員工姓名

2.2.4.1問題分析

求每個部門最早進入公司員工姓名,需要得到各部門所有員工的進入公司日期,通過比較獲取最早進入公司員工姓名。首先和問題1類似在Mapper的Setup階段快取部門資料,然後Mapper階段抽取出key為部門名稱(利用快取部門資料把部門編號對應為部門名稱),value為員工姓名和進入公司日期,接著在Shuffle階段把傳過來的資料處理為部門名稱對應該部門所有員工+進入公司日期的列表,最後在Reduce中按照部門歸組,遍歷部門所有員工,找出最早進入公司的員工並輸出。

2.2.4.2處理流程圖

clip_image020

2.2.4.3編寫程式碼

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q3DeptEarliestEmp extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用於快取 dept檔案中的資料
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法會在Map方法執行之前執行且執行一次
        @Override
        protectedvoid setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 從當前作業中獲取要快取的檔案
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {
 
                            // 對部門檔案欄位進行拆分並快取到deptMap中
                            // 其中Map中key為部門編號,value為所在部門名稱
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
 
        publicvoid map(LongWritable key, Text value, Context context) throws IOException,     InterruptedException {
 
            // 對員工檔案欄位進行拆分
            kv = value.toString().split(",");
 
            // map join: 在map階段過濾掉不需要的資料
            // 輸出key為部門名稱和value為員工姓名+","+員工進入公司日期
            if (deptMap.containsKey(kv[7])) {
                if (null != kv[4] && !"".equals(kv[4].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim()                    + "," +kv[4].trim()));
                }
            }
        }
    }
 
    publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {
 
        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException,        InterruptedException {
 
            // 員工姓名和進入公司日期
            String empName = null;
            String empEnterDate = null;
 
            // 設定日期轉換格式和最早進入公司的員工、日期
            DateFormat df = new SimpleDateFormat("dd-MM月-yy");
 
            Date earliestDate = new Date();
            String earliestEmp = null;
 
            // 遍歷該部門下所有員工,得到最早進入公司的員工資訊
            for (Text val : values) {
                empName = val.toString().split(",")[0];
                empEnterDate = val.toString().split(",")[1].toString().trim();
                try {
                    System.out.println(df.parse(empEnterDate));
                    if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {
                        earliestDate = df.parse(empEnterDate);
                        earliestEmp = empName;
                    }
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
 
            // 輸出key為部門名稱和value為該部門最早進入公司員工
            context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter            date:" + newSimpleDateFormat("yyyy-MM-dd").format(earliestDate)));
        }
    }
 
    @Override
    publicint run(String[] args) throws Exception {
 
        // 例項化作業物件,設定作業名稱
        Job job = new Job(getConf(), "Q3DeptEarliestEmp");
        job.setJobName("Q3DeptEarliestEmp");
 
        // 設定Mapper和Reduce類
        job.setJarByClass(Q3DeptEarliestEmp.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 設定輸入格式類
        job.setInputFormatClass(TextInputFormat.class);
 
        // 設定輸出格式類
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1個引數為快取的部門資料路徑、第2個引數為員工資料路徑和第三個引數為輸出路徑
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
 
        job.waitForCompletion(true);
        returnjob.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,執行入口
     * @param args 輸入引數
     */
    publicstaticvoid main(String[] args) throws Exception {
        intres = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);
        System.exit(res);
    }
}


2.2.4.4配置執行引數

新建一個Java應用執行程式,需要在Arguments頁籤填寫Q3DeptEarliestEmp執行的部門資料路徑、員工資料路徑和輸出路徑三個引數,需要注意的是hdfs的路徑引數路徑需要全路徑,否則執行會報錯:

l  部門資料路徑:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部門資料將快取在各執行任務的節點內容中,可以提供處理的效率

l  員工資料路徑:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  輸出路徑:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q3

clip_image022

2.2.4.5執行並檢視結果

設定執行引數完畢後,點選執行按鈕:

clip_image024

執行成功後,重新整理CentOS HDFS中的輸出路徑/usr/hadoop/out/week7_q3目錄,開啟part-r-00000檔案,可以看到執行結果:

ACCOUNTINGThe earliest emp of dept:CLARK, Enter date:1981-06-09

RESEARCHThe earliest emp of dept:SMITH, Enter date:1980-12-17

SALES  The earliest emp of dept:ALLEN, Enter date:1981-02-20

clip_image026

2.2.5問題4:求各個城市的員工的總工資

2.2.5.1問題分析

求各個城市員工的總工資,需要得到各個城市所有員工的工資,通過對各個城市所有員工工資求和得到總工資。首先和問題1類似在Mapper的Setup階段快取部門對應所在城市資料,然後在Mapper階段抽取出key為城市名稱(利用快取資料把部門編號對應為所在城市名稱),value為員工工資,接著在Shuffle階段把傳過來的資料處理為城市名稱對應該城市所有員工工資,最後在Reduce中按照城市歸組,遍歷城市所有員工,求出工資總數並輸出。

2.2.5.2處理流程圖

clip_image028

2.2.5.3編寫程式碼

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q4SumCitySalary extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        // 用於快取 dept檔案中的資料
        private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;
 
        // 此方法會在Map方法執行之前執行且執行一次
        @Override
        protectedvoid setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 從當前作業中獲取要快取的檔案
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {
 
                            // 對部門檔案欄位進行拆分並快取到deptMap中
                            // 其中Map中key為部門編號,value為所在城市名稱
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
 
        publicvoid map(LongWritable key, Text value, Context context) throws IOException,     InterruptedException {
 
            // 對員工檔案欄位進行拆分
            kv = value.toString().split(",");
 
            // map join: 在map階段過濾掉不需要的資料,輸出key為城市名稱和value為員工工資
            if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }
 
    publicstaticclass Reduce extends Reducer<Text, Text, Text, LongWritable> {
 
        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException,        InterruptedException {
 
            // 對同一城市的員工工資進行求和
            long sumSalary = 0;
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
            }
 
            // 輸出key為城市名稱和value為該城市工資總和
            context.write(key, new LongWritable(sumSalary));
        }
    }
 
    @Override
    publicint run(String[] args) throws Exception {
 
        // 例項化作業物件,設定作業名稱
        Job job = new Job(getConf(), "Q4SumCitySalary");
        job.setJobName("Q4SumCitySalary");
 
        // 設定Mapper和Reduce類
        job.setJarByClass(Q4SumCitySalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 設定輸入格式類
        job.setInputFormatClass(TextInputFormat.class);
 
        // 設定輸出格式類
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1個引數為快取的部門資料路徑、第2個引數為員工資料路徑和第3個引數為輸出路徑
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
 
        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,執行入口
     * @param args 輸入引數
     */
    publicstaticvoid main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);
        System.exit(res);
    }
}


2.2.5.4配置執行引數

新建一個Java應用執行程式,需要在Arguments頁籤填寫Q4SumCitySalary執行的部門資料路徑、員工資料路徑和輸出路徑三個引數,需要注意的是hdfs的路徑引數路徑需要全路徑,否則執行會報錯:

l  部門資料路徑:hdfs://hadoop1:9000/usr/hadoop/in/dept ,部門資料將快取在各執行任務的節點內容中,可以提供處理的效率

l  員工資料路徑:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  輸出路徑:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q4

clip_image030

2.2.5.5執行並檢視結果

設定執行引數完畢後,點選執行按鈕:

clip_image032

執行成功後,重新整理CentOS HDFS中的輸出路徑/usr/hadoop/out/week7_q4目錄,開啟part-r-00000檔案,可以看到執行結果:

CHICAGO  9400

DALLAS     6775

NEW YORK     8750

clip_image034

2.2.6問題5:列出工資比上司高的員工姓名及其工資

2.2.6.1問題分析

求工資比上司高的員工姓名及工資,需要得到上司工資及上司所有下屬員工,通過比較他們工資高低得到比上司工資高的員工。在Mapper階段輸出經理資料和員工對應經理表資料,其中經理資料key為員工編號、value為"M,該員工工資",員工對應經理表資料key為經理編號、value為"E,該員工姓名,該員工工資";然後在Shuffle階段把傳過來的經理資料和員工對應經理表資料進行歸組,如編號為7698員工,value中標誌M為自己工資,value中標誌E為其下屬姓名及工資;最後在Reduce中遍歷比較員工與經理工資高低,輸出工資高於經理的員工。

2.2.6.2處理流程圖

clip_image036

2.2.6.3編寫程式碼

import java.io.IOException;
import java.util.HashMap;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q5EarnMoreThanManager extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, Text, Text> {
 
        publicvoid map(LongWritable key, Text value, Context context) throws IOException,     InterruptedException {
 
            // 對員工檔案欄位進行拆分
            String[] kv = value.toString().split(",");
 
            // 輸出經理表資料,其中key為員工編號和value為M+該員工工資
            context.write(new Text(kv[0].toString()), new Text("M," + kv[5]));
 
            // 輸出員工對應經理表資料,其中key為經理編號和value為(E,該員工姓名,該員工工資)
            if (null != kv[3] && !"".equals(kv[3].toString())) {
                context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5]));
            }
        }
    }
 
    publicstaticclass Reduce extends Reducer<Text, Text, Text, Text> {
 
        publicvoid reduce(Text key, Iterable<Text> values, Context context) throws IOException,        InterruptedException {
 
            // 定義員工姓名、工資和存放部門員工Map
            String empName;
            long empSalary = 0;
            HashMap<String, Long> empMap = new HashMap<String, Long>();
           
            // 定義經理工資變數
            long mgrSalary = 0;
 
            for (Text val : values) {
                if (val.toString().startsWith("E")) {
                    // 當是員工標示時,獲取該員工對應的姓名和工資並放入Map中
                    empName = val.toString().split(",")[1];
                    empSalary = Long.parseLong(val.toString().split(",")[2]);
                    empMap.put(empName, empSalary);
                } else {
                    // 當時經理標誌時,獲取該經理工資
                    mgrSalary = Long.parseLong(val.toString().split(",")[1]);
                }
            }
 
            // 遍歷該經理下屬,比較員工與經理工資高低,輸出工資高於經理的員工
            for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) {
                if (entry.getValue() > mgrSalary) {
                    context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
                }
            }
        }
    }
 
    @Override
    publicint run(String[] args) throws Exception {
 
        // 例項化作業物件,設定作業名稱
        Job job = new Job(getConf(), "Q5EarnMoreThanManager");
        job.setJobName("Q5EarnMoreThanManager");
 
        // 設定Mapper和Reduce類
        job.setJarByClass(Q5EarnMoreThanManager.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
 
        // 設定輸入格式類
        job.setInputFormatClass(TextInputFormat.class);
 
        // 設定輸出格式類
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        // 第1個引數為員工資料路徑和第2個引數為輸出路徑
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
 
        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }
 
    /**
     * 主方法,執行入口
     * @param args 輸入引數
     */
    publicstaticvoid main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
        System.exit(res);
    }
}


2.2.6.4配置執行引數

新建一個Java應用執行程式,需要在Arguments頁籤填寫Q5EarnMoreThanManager執行的員工資料路徑和輸出路徑兩個引數,需要注意的是hdfs的路徑引數路徑需要全路徑,否則執行會報錯:

l  員工資料路徑:hdfs:// hadoop1:9000/usr/hadoop/in/emp

l  輸出路徑:hdfs:// hadoop1:9000/usr/hadoop/out/week7_q5

clip_image038

2.2.6.5執行並檢視結果

設定執行引數完畢後,點選執行按鈕:

clip_image040

執行成功後,重新整理CentOS HDFS中的輸出路徑/usr/hadoop/out/week7_q5目錄,開啟part-r-00000檔案,可以看到執行結果:

FORD  3000

clip_image042

2.2.7問題6:列出工資比公司平均工資要高的員工姓名及其工資

2.2.7.1問題分析

求工資比公司平均工資要高的員工姓名及工資,需要得到公司的平均工資和所有員工工資,通過比較得出工資比平均工資高的員工姓名及工資。這個問題可以分兩個作業進行解決,先求出公司的平均工資,然後與所有員工進行比較得到結果;也可以在一個作業進行解決,這裡就得使用作業setNumReduceTasks方法,設定Reduce任務數為1,保證每次執行一個reduce任務,從而能先求出平均工資,然後進行比較得出結果。

在Mapper階段輸出兩份所有員工資料,其中一份key為0、value為該員工工資,另外一份key為0、value為"該員工姓名 ,員工工資";然後在Shuffle階段把傳過來資料按照key進行歸組,在該任務中有key值為0和1兩組資料;最後在Reduce中對key值0的所有員工求工資總數和員工數,獲得平均工資;對key值1,比較員工與平均工資的大小,輸出比平均工資高的員工和對應的工資。

2.2.7.2處理流程圖

clip_image044

2.2.7.3編寫程式碼

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
publicclass Q6HigherThanAveSalary extends Configured implements Tool {
 
    publicstaticclass MapClass extends Mapper<LongWritable, Text, IntWritable,