MapReduce程式設計案例
wordcount程式
package wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat ;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception {
// 建立一個job和任務入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); //main方法所在的class
//指定job的mapper和輸出的型別<k2 v2>
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); //k2的型別
//定義自己的比較器 job.setSortComparatorClass(MyWordCountComparator.class);
//引入一個Combiner,是一種特殊的reducer
job.setCombinerClass(WordCountReducer.class );
//指定job的reducer和輸出的型別<k4 v4>
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class); //k4的型別
//指定job的輸入和輸出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行job
job.waitForCompletion(true);
}
}
package wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 泛型 k1 v1 k2 v2
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
/*
* context 表示Mapper的上下文
* 上文:HDFS
* 下文:Mapper
*/
//資料: I love Beijing
String data = value1.toString();
//分詞
String[] words = data.split(" ");
//輸出 k2 v2
for(String w:words){
context.write(new Text(w), new IntWritable(1));
}
}
}
package wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
/*
* context是reduce的上下文
* 上文
* 下文
*/
//對v3求和
int total = 0;
for(IntWritable v:v3){
total += v.get();
}
//輸出 k4 單詞 v4 頻率
context.write(k3, new IntWritable(total));
}
}
package wc;
import org.apache.hadoop.io.Text;
public class MyWordCountComparator extends Text.Comparator{
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
SalTotal
package saltotal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
// 建立一個job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//指定job的mapper和輸出的型別 k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
//指定job的reducer和輸出的型別 k4 v4
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入和輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行任務
job.waitForCompletion(true);
}
}
package saltotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
//資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分詞
String[] words = data.split(",");
//輸出:k2 部門號 v2 薪水
context.write(new IntWritable(Integer.parseInt(words[7])), //部門號
new IntWritable(Integer.parseInt(words[5]))); //薪水
}
}
package saltotal;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context)
throws IOException, InterruptedException {
//對v3求和,得到該部門的工資總額
int total = 0;
for(IntWritable v:v3){
total += v.get();
}
//輸出 部門號 總額
context.write(k3, new IntWritable(total));
}
}
Sort排序
number排序
package sort.number;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class NumberSortMain {
public static void main(String[] args) throws Exception {
// 建立一個job和任務入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(NumberSortMain.class); //main方法所在的class
//指定job的mapper和輸出的型別<k2 v2>
job.setMapperClass(NumberSortMapper.class);
job.setMapOutputKeyClass(IntWritable.class); //k2的型別
job.setMapOutputValueClass(NullWritable.class); //v2的型別
//指定自己的比較器 job.setSortComparatorClass(MyNumberComparator.class);
//指定job的輸入和輸出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行job
job.waitForCompletion(true);
}
}
package sort.number;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2 薪水 v2 空值
public class NumberSortMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
// 資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分詞
String[] words = data.split(",");
//輸出:k2 薪水 v2 空值
context.write(new IntWritable(Integer.parseInt(words[5])), NullWritable.get());
}
}
package sort.number;
import org.apache.hadoop.io.IntWritable;
//我自己的一個比較器,實現數字的降序排序
public class MyNumberComparator extends IntWritable.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// 定義自己的排序比較規則:改成降序
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
Object排序
package sort.object;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
public static void main(String[] args) throws Exception {
// 建立一個job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmployeeSortMain.class);
//指定job的mapper和輸出的型別 k2 v2
job.setMapperClass(EmployeeSortMapper.class);
job.setMapOutputKeyClass(Employee.class);
job.setMapOutputValueClass(NullWritable.class);
//指定job的輸入和輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行任務
job.waitForCompletion(true);
}
}
package sort.object;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分詞
String[] words = data.split(",");
//建立員工物件
Employee e = new Employee();
//設定員工的屬性
//員工號
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//職位
e.setJob(words[2]);
//老闆號: 注意 可能沒有老闆號
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//沒有老闆號
e.setMgr(-1);
}
//入職日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//獎金:注意:獎金也可能沒有
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//沒有獎金
e.setComm(0);
}
//部門號
e.setDeptno(Integer.parseInt(words[7]));
//輸出
context.write(e, NullWritable.get());
}
}
package sort.object;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
//1、要把Employee作為key2,需要實現序列化
//2、員工物件Employee類,可被排序的
//資料: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements WritableComparable<Employee>{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
@Override
public int compareTo(Employee o) {
// 多個列的排序:select * from emp order by deptno,sal;
//首先、按照deptno排序
if(this.deptno > o.getDeptno()){
return 1;
}else if(this.deptno < o.getDeptno()){
return -1;
}
//如果deptno相等,按照sal排序
if(this.sal >= o.getSal()){
return 1;
}else{
return -1;
}
}
// @Override
// public int compareTo(Employee o) {
// // 定義自己的比較規則:一個列排序
// //舉例:按照員工的薪水排序
// if(this.sal >= o.getSal()){
// return 1;
// }else{
// return -1;
// }
// }
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
Serilizable程式
java序列化
package serializable.java;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class Student implements Serializable{
//屬性
private int stuID;
private String stuName;
public Student(){
}
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
public static void main(String[] args) throws Exception {
// 建立一個學生物件
Student s1 = new Student();
s1.setStuID(1);
s1.setStuName("Tom");
//構造一個輸出流
ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream("d:\\temp\\student.1"));
out.writeObject(s1);
out.close();
}
}
mapreduce序列化
package serializable.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
// 建立一個job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//指定job的mapper和輸出的型別 k2 v2
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Employee.class);
//指定job的reducer和輸出的型別 k4 v4
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入和輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行任務
job.waitForCompletion(true);
}
}
package serializable.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
@Override
protected void map(LongWritable key1, Text value1,Context context)
throws IOException, InterruptedException {
//資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分詞
String[] words = data.split(",");
//建立員工物件
Employee e = new Employee();
//設定員工的屬性
//員工號
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//職位
e.setJob(words[2]);
//老闆號: 注意 可能沒有老闆號
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//沒有老闆號
e.setMgr(-1);
}
//入職日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//獎金:注意:獎金也可能沒有
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//沒有獎金
e.setComm(0);
}
//部門號
e.setDeptno(Integer.parseInt(words[7]));
//輸出:k2 部門號 v2 員工物件
context.write(new IntWritable(e.getDeptno()), //員工的部門號
e); //員工物件
}
}
package serializable.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable> {
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
//取出v3中的每個員工 進行工資求和
int total = 0;
for(Employee e:v3){
total = total + e.getSal();
}
//輸出
context.write(k3, new IntWritable(total));
}
}
package serializable.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//資料: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
相關推薦
MapReduce程式設計案例系列篇(1-9)
由於本人最開始接觸大資料工作,主要以寫MapReduce程式為主,雖然現在有流行的言論稱MapReduce這種執行很慢的分散式計算程式設計框架將要被各種記憶體計算框架取代。但是MapRedcue也會吸收很多流行的記憶體計算的各種優點,我相信,將來,MapReduce絕對不會淪
MapReduce程式設計案例
wordcount程式
package wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.
大資料_Shuffle、MapReduce程式設計案例(資料去重、多表查詢、倒排索引、使用單元測試)
一、什麼是Shuffle(洗牌) ----> MapReduce核心
1、序列化
2、排序
3、分割槽
4、合併
二、MapReduce程式設計案例 ------> 掌握方法:如何開發一個程式
1、資料
mapreduce的join演算法程式設計案例
mapreduce程式設計案例
map端的join演算法
1、原理闡述
適用於關聯表中有小表的情形,可以將小表傳送到所有的map節點,這樣map節點就可以在本地對自己讀到的大表資料進行join並輸出最終結果,可以大大提高join操作的併發度,加快處理速度
2、例項:
兩表
Linux網路程式設計案例分析
本程式碼來自於博主:輝夜星辰
本篇主要對執行程式碼中出現的問題進行分析,程式碼本身的內容後續展開討論。
伺服器端程式碼
1 /*
2 Linux網路程式設計之TCP程式設計,伺服器端讀資料
3 socket函式之後,返回值serfd,作為後面所有網路程式設計函式
MapReduce程式設計
MapReduce Coding Criteria
單個MapReduce
單元運算
以WordCount為例
分別編寫Map和Reduce函式
編寫main方法,設定環境變數,進行註冊:
《遊戲人工智慧程式設計案例精粹(修訂版)》pdf格式下載電子書免費下載
《遊戲人工智慧程式設計案例精粹(修訂版)》pdf格式下載電子書免費下載: https://u253469.ctfile.com/fs/253469-302448508
內容簡介
《遊戲人工智慧程式設計案例精粹(修訂版)》由Mat Buckland著,羅岱等人譯,是遊戲人工智慧方面的經典之作,
C程式設計--案例(2018年江蘇大學程式設計考研試題 -- 程式設計題)
題目(總):
解答(答案為博主自已所寫,並非最優程式碼,僅供參考)
第一題
題目
定義一個函式,計算並返回如下算式的值:函式式見上,在主函式中輸入10組實數a、b、c的值。。。。。。
參考程式碼(答案並非最優程式碼,僅供參考
C程式設計--案例(2017年江蘇大學程式設計考研試題 -- 程式設計題)
題目(總):
解答(答案為博主自已所寫,並非最優程式碼,僅供參考)
第一題
題目
切比雪夫多項式 (運用遞迴函式計算)
參考程式碼(答案並非最優程式碼,僅供參考)
#include<stdio.h>
C程式設計--案例(2016年江蘇大學程式設計考研試題 -- 程式設計題)
題目(總):
解答(答案為博主自已所寫,並非最優程式碼,僅供參考)
第一題
題目
已知檔案Coefficient.txt存有多個方程中係數(具體見題目總)。。。。(檔案讀寫 + 一元二次方程的處理)
參考程式碼(答案並非最優
C程式設計--案例
宣告
案例:來自2007年江蘇大學程式設計考研試題 最後一題 答案為博主自己所寫,可能不是最優的程式碼,僅供參考;
題目
參考程式碼(並非最優程式碼,僅供參考)
#include<stdio.h>
#include<math.h>
C程式設計案例(二分法求方程的根)
原理
設函式f(x)在[a,b]上連續,且f(a)*f(b)<0,則表明f(x)在[a,b]上至少有一個零點。 微積分中的介值定理。然後通過二分割槽間,縮小區間範圍,當小到一定的精確度的時候,這個x就是我們所求的近似根了。
問題描述:
用二分法求下面方程在區間
C程式設計案例(牛頓迭代法求高次方程的根)
牛頓迭代法求方程的根
1. 牛頓迭代法的幾何解釋
註解: 設
r
r
C程式設計案例(矩形法求定積分問題)
矩形法求定積分問題
程式碼實現:
#include<stdio.h>
#include<math.h>
float fsin(float x);
float func(float (*p)(float),float a,float b
C程式設計案例(求 ax^2+bx+c=0 的解)
問題:求方程:
a
x
2
C程式設計--案例(2008年江蘇大學程式設計考研試題 -- 程式設計題)
題目(總):
解答(答案為博主自已所寫,並非最優程式碼,僅供參考)
第一題
題目
給一個不多於5位的正整數,要求: 1、求出它是幾位數 2、分別輸出每一位數字 3、按逆序輸出各位數字,例如原數為321,應輸出123。
參考程式
大資料技術學習筆記之Hadoop框架基礎2-MapReduce程式設計及執行流程
一、回顧
-》hadoop的功能?
-》海量資料儲存和海量計算問題
-》分散式檔案儲存框架hdfs和
大資料之Hadoop學習——動手實戰學習MapReduce程式設計例項
文章目錄
一、MapReduce程式設計例項
1.自定義物件序列化
需求分析
報錯:Exception in thread "main" java.lang.IllegalArgumentExcept
Hadoop-MapReduce計算案例1:WordCount
案例描述:計算一個檔案中每個單詞出現的數量
程式碼:
package com.jeff.mr.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
MapReduce程式設計之Combiner
Combiner
可以理解為本地的reducer,減少了Map Tasks輸出的資料量以及資料網路傳輸量
編譯執行:
hadoop jar /home/zq/lib/HDFS_Test-1.0-SNAPSHOT.jar MapReduce.CombinerAp