MR讀Hbase資料,寫入到Mysql(HBase->Mysql)
首先看一下Hbase的資料 ,本系統將Hbase放入mysql
首先看一下hbase表結構
需求:根據使用者在hbase的通話記錄,求出每個使用者每個月總共通話時間,放入mysql中
第一步、建立mapper端
package phoneXM; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class PhoneMapper extends TableMapper<Text, Text> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //將fruit的name和color提取出來,相當於將每一行資料提取出來放入put中 Put put = new Put(key.get()); // Get get = new Get(); //遍歷行 String rowkey = new String(key.get()); String name = ""; String phone = ""; String name2 = ""; String phone2 = ""; String time = ""; String sum = ""; for (Cell cell : value.rawCells()) { if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) { /// 添 加 克 隆 列 :name if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //將該列 cell 加入到 put 物件中 name = Bytes.toString(CellUtil.cloneValue(cell)); } else if ("phone".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //向該列 cell 加入到 put 物件中 phone = Bytes.toString(CellUtil.cloneValue(cell)); }else if ("nameTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //向該列 cell 加入到 put 物件中 name2 = Bytes.toString(CellUtil.cloneValue(cell)); }else if ("phoneTo".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //向該列 cell 加入到 put 物件中 phone2 = Bytes.toString(CellUtil.cloneValue(cell)); }else if ("time".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //向該列 cell 加入到 put 物件中 time = Bytes.toString(CellUtil.cloneValue(cell)); }else if ("sum".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) { //向該列 cell 加入到 put 物件中 sum = Bytes.toString(CellUtil.cloneValue(cell)); } } } //將從 fruit 讀取到的每行資料寫入到 context 中作為 map 的輸出 String info = name+"-"+name2+"-"+phone+"-"+phone2+"-"+sum; System.out.println(rowkey); System.out.println(info); // 01_手機號_yyyyMMddhhmmss_1 String[] split = rowkey.split("_"); // 擷取電話號碼 String phoneNum = split[1]; // 拼接key String dataCallKe = phoneNum+"_"+split[2].substring(0,6); // 拼接value String keys = phoneNum+dataCallKe; //輸出到檔案 context.write(new Text(keys), new Text(info)); } }
第二步、建立Reduce端程式碼
package phoneXM; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PhoneReducer extends Reducer<Text, Text, UserInfo,NullWritable> { private UserInfo userInfo = new UserInfo(); // private UserInfoDBWritable userInfoDBWritable = null; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //獲取手機號 // String phone = key.toString().split("_")[1]; //拼裝資訊 Integer longTime = 0; for(Text text:values){ String time = text.toString().split("-")[4]; longTime += Integer.parseInt(time); } Text tt = new Text(longTime+""); System.out.println(key.toString()); String phone = key.toString().split("_")[0]; String month = key.toString().split("_")[1]; // id, userInfo.setPhone(phone); // account userInfo.setMonth(month); // name userInfo.setSumTime(longTime+""); // 寫入到db,放在key // userInfoDBWritable = new UserInfoDBWritable(userInfo); context.write(userInfo , null); //context.write(key,tt); } }
第三步、Driver端程式碼
package phoneXM;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import phoneXM.PhoneMapper;
import phoneXM.PhoneReducer;
import java.io.FileOutputStream;
import java.io.IOException;
//將 fruit 表中的一部分資料,通過 MR 遷入到 fruit_mr 表中。
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception{
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","es1,es2,es3");
configuration.set("hbase.zookeeper.property.clientport","2181");
int re = ToolRunner.run(configuration,new Driver(),args);
System.exit(re);
}
public int run(String[] args) throws Exception {
// 得到Conf
Configuration configuration = this.getConf();
//資料庫配置
DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.244.162:3306/phone","root", "123456");
Job job = Job.getInstance(configuration, "db info1");
// 建立job任務
// Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(Driver.class);
// 配置job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
// 設定Mapper
TableMapReduceUtil.initTableMapperJob(
"dianxin:phone", // 資料來源的表名
scan, // scan掃描控制器
PhoneMapper.class, // 設定Mapper類
Text.class, // 設定Mapper輸入key型別
Text.class, // 設定Mapper輸出value值型別
job // 設定job
);
// 設定Reduce
/*TableMapReduceUtil.initTableReducerJob(
"hbase_mr", // 表名
Test_reduce.class, // 設定reduce
job
);*/
// 設定reduce數量,最少一個
job.setNumReduceTasks(1);
job.setReducerClass(PhoneReducer.class);
job.setOutputKeyClass(UserInfo.class);
job.setOutputValueClass(NullWritable.class);
//FileOutputFormat.setOutputPath(job, new Path("D:\\Demo\\hadoop\\ouput\\out1"));
DBOutputFormat.setOutput(job, "info1", "phone", "month", "sumTime");
job.setOutputFormatClass(DBOutputFormat.class);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
}
工具類、UserInfo
package phoneXM;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class UserInfo implements DBWritable {
//主要是把手機號,月份,通話總時間放入到mysql,所以把這3個封裝一個類
private String phone;
private String month;
private String sumTime;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getMonth() {
return month;
}
public void setMonth(String month) {
this.month = month;
}
public String getSumTime() {
return sumTime;
}
public void setSumTime(String sumTime) {
this.sumTime = sumTime;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,this.getPhone());
statement.setString(2,this.getMonth());
statement.setString(3,this.getSumTime());
}
public void readFields(ResultSet resultSet) throws SQLException {
}
}
測試
叢集Hbase開啟,執行程式碼,檢視資料庫表資訊
到此已經完了,大家可以去測試一下