1. 程式人生 > >MR的高階功能 1、序列化

MR的高階功能 1、序列化

*)Java的序列化:如果一個類實現了Java的序列化介面(Serializable),這個類的物件可以作為InputStream和OutputStream物件

*)MR的序列化:(1)所有的hadoop的資料型別都實現了Hadoop的序列化

 2)如果一個類實現了Hadoop的序列化介面(Writable),這個類物件可以作為Map和Reduce的輸入和輸出(key value)

 3)序列化的順序一定要跟反序列化順序一樣

===================================================================

*)Java的序列化:如果一個類實現了Java的序列化介面(Serializable),這個類的物件可以作為InputStream和OutputStream物件


---------------------------------------------------------------------------------------------------------------

1、定義student物件實現 Serializable 介面

package demo.serializable.java;

import java.io.Serializable;

public class Student implements Serializable{


private int stuID;
private String stuName;

public Student(){

}


public int getStuID() {
return stuID;
}


public void setStuID(int stuID) {
this.stuID = stuID;
}


public String getStuName() {
return stuName;
}


public void setStuName(String stuName) {
this.stuName = stuName;
}


}

------------------------------------------------------------------------------------------------------

package demo.serializable.java;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;


public class StudentMain {

 public static void main(String[] args) throws Exception {
//建立一個學生物件
Student s = new Student();
s.setStuID(1);
s.setStuName("Tom");

//將該物件儲存到檔案 -----> 序列化
OutputStream out = new FileOutputStream("d:\\temp\\student.ooo");
ObjectOutputStream objOut = new ObjectOutputStream(out);

//輸出物件
objOut.writeObject(s);

//關閉
objOut.close();
out.close();

System.out.println("完成");
}
}

====================================================================

MR的序列化:(1)所有的hadoop的資料型別都實現了Hadoop的序列化

 2)如果一個類實現了Hadoop的序列化介面(Writable),這個類物件可以作為Map和Reduce的輸入和輸出(key value)

 (3)序列化的順序一定要跟反序列化順序一樣

------------------------------------------------------------------------------------------------------------------

1、定義員工 Employee 類 實現Hadoop的Writable介面

package demo.serializable.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;


//員工類: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{


private int empno;//員工號
private String ename;//姓名
private String job;//職位
private int mgr;//老闆號
private String hiredate;//入職日期
private int sal;//月薪
private int comm;//獎金
private int deptno;// 部門號

@Override
public String toString() {
return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]";
}


@Override
public void write(DataOutput output) throws IOException {
// 序列化:把物件輸出
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}

@Override
public void readFields(DataInput input) throws IOException {
// 反序列化:把物件讀入
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}


public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}



}

---------------------------------------------------------------------------------------------------------

EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee>

階段

package demo.serializable.mr;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


//                                                              k2:員工號                 v2: 員工物件
public class EmployeeMapper extends Mapper<LongWritable, Text, LongWritable, Employee> {


@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
// 資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = v1.toString();

//分詞
String[] words = data.split(",");

//建立一個員工物件
Employee e = new Employee();

//設定員工號
e.setEmpno(Integer.parseInt(words[0]));
//設定姓名
e.setEname(words[1]);

//設定職位 job
e.setJob(words[2]);

//設定老闆號
try{
e.setMgr(Integer.parseInt(words[3]));
}catch(Exception ex){
//老闆號為null
e.setMgr(0);
}

//設定入職日期
e.setHiredate(words[4]);

//設定薪水
e.setSal(Integer.parseInt(words[5]));

//設定獎金
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//沒有獎金
e.setComm(0);
}

//設定部門號
e.setDeptno(Integer.parseInt(words[7]));


//輸出
context.write(new LongWritable(e.getEmpno()), e);
}
}
-----------------------------------------------------------------------------------------------------

Reducer階段

package demo.serializable.mr;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;


public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {


@Override
protected void reduce(LongWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
for(Employee e:v3){
context.write(k3, e);
}
}


}
---------------------------------------------------------------------------------------------------------------

main階段

package demo.serializable.mr;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class EmployeeMain {


public static void main(String[] args) throws Exception {
// 建立一個任務job = map + reduce
Job job = Job.getInstance(new Configuration());
//指定任務的入口
job.setJarByClass(EmployeeMain.class);

//指定任務的Map和輸出的資料型別
job.setMapperClass(EmployeeMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Employee.class);

//指定任務的Reduce和輸出的資料型別
job.setReducerClass(EmployeeReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Employee.class);

//指定輸入和輸出的HDFS路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//提交任務
job.waitForCompletion(true);


}


}