MapReduce實現sql的查詢功能
阿新 • • 發佈:2018-12-30
查詢員工表中部門所有人的工資總和
SQL 語句為:select deptno , sum(sal) from emp group by deptno order by deptno;
下面用MapReduce程式來實現對該SQL語句相同的查詢功能
Mapper程式如下:
package SalaryTotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { /* * context 表示Mapper的上下文 * 上文:HDFS * 下文:Mapper */ //資料:7900,JAMES,CLERK,7698,1981/12/3,950,,30 String data = value1.toString(); //分詞 String[] words = data.split(","); //輸出 k2部門號 v2薪水 for(String w:words) { context.write(new IntWritable(Integer.parseInt(words[7])), new IntWritable(Integer.parseInt(words[5]))); } } }
Reducer程式如下:
package SalaryTotal; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; public class SalaryTotalReucer extends Reducer<IntWritable, IntWritable,IntWritable ,IntWritable> { @Override protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException { //對v3求和,得到該部門的工資總額 int total = 0; for(IntWritable v:v3) { total += v.get(); } //輸出 context.write(k3, new IntWritable(total)); } }
主程式如下:
package SalaryTotal; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SalaryTotalMain { public static void main(String[] args) throws Exception { //建立一個job Job job = Job.getInstance(new Configuration()); job.setJarByClass(SalaryTotalMain.class); //指定job的mapper和輸出的型別 k2 v2 job.setMapperClass(SalaryTotalMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); //指定job的reducer和輸出的型別 k4 v4 job.setReducerClass(SalaryTotalReucer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //指定job的輸入和輸出的路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //執行任務 job.waitForCompletion(true); } }
在Hadoop上執行此MapReduce程式
hadoop jar salary.jar /scott/emp.csv /output/0814/salary
檢視執行的結果
由此結果可以看出,此MapReduce程式和SQL 語句的執行結果一樣,即MapReduce程式正確。