使用MapReduce讀取HBase資料儲存到MySQL
阿新 • • 發佈:2018-12-08
Mapper讀取HBase資料
package MapReduce; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import java.io.IOException; public class CallMapper extends TableMapper<phoneInfoDBWritable,phoneInfoDBWritable>{ //將log的caller,callee,time,dur提取出來,相當於將每一行資料讀取出來放入到 phoneInfo 物件中。 private phoneInfo pp = new phoneInfo(); private phoneInfoDBWritable pDB = null; @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //獲取rowkey String rowkey = new String(key.get()); //獲取一行資料 Cell[] cells = value.rawCells(); // 獲取的資料,通話時長,日期 String caller = ""; String callee = ""; String time = ""; String dur = ""; String flag = ""; String dateCallk = ""; //迴圈取出 for (Cell cell :cells){ // 取出行名稱 String lineName = new String(CellUtil.cloneQualifier(cell)); // 判斷打電話的人 if(lineName.equals("caller")){ caller = new String(CellUtil.cloneValue(cell)); } // 接電話的人 if(lineName.equals("callee")){ callee = new String(CellUtil.cloneValue(cell)); } // 判斷日期 if(lineName.equals("time")){ time = new String(CellUtil.cloneValue(cell)); } // 判斷時長 if(lineName.equals("dur")){ dur = new String(CellUtil.cloneValue(cell)); } // 判斷日期 if(lineName.equals("flag")){ flag = new String(CellUtil.cloneValue(cell)); } //01_手機號_yyyMMddhhmmss_1 String[] split = rowkey.split("_"); //擷取打電話的人的電話號碼 String phoneNum = split[1]; //拼接key dateCallk = phoneNum + "_" + split[2].substring(0, 6); //輸出到檔案 } //測試輸出內容 pp.setCaller(caller); pp.setCallee(callee); pp.setTime(time); pp.setDur(dur); pp.setFlag(flag); //System.err.println("rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag); //String string = "rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag; //將資料寫入到mysql中 pDB = new phoneInfoDBWritable(pp); context.write(pDB,null); } }
Driver配置分發任務
package MapReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; public class MRRunner { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); //建立configuration conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf, "db store"); //實現與資料庫的連線 DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/callphone", "root","root"); //將從HBase表中獲取的資料封裝寫入到資料庫表的格式 DBOutputFormat.setOutput(job, "phone", "caller", "callee", "time", "dur","flag"); //設定Driver job.setJarByClass(MRRunner.class); //設定資料輸出學出到mysql的類格式 job.setOutputFormatClass(DBOutputFormat.class); //掃描HBase表 Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); //設定Mapper job.setMapperClass(CallMapper.class); TableMapReduceUtil.initTableMapperJob( "phone:log", scan, CallMapper.class, phoneInfoDBWritable.class, phoneInfoDBWritable.class, job); // 設定Reduce數量,沒有使用到Reducer job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
phoneInfo封裝讀取到的HBase
package MapReduce; /** * 構建phoneInfo類,將HBase表中的資料儲存到phoneInfo物件中 * 實現封裝資料 */ public class phoneInfo{ private String caller; private String callee; private String time; private String dur; private String flag; public String getCaller() { return caller; } public void setCaller(String caller) { this.caller = caller; } public String getCallee() { return callee; } public void setCallee(String callee) { this.callee = callee; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getDur() { return dur; } public void setDur(String dur) { this.dur = dur; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } }
phoneInfoDBWritable實現DBWritable用於存放phoneInfo物件
package MapReduce; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; /** * 編寫phoneInfoDBWritable類實現DBWritable,完成HBase的資料寫入到指定的MySQL的序列化 */ public class phoneInfoDBWritable implements DBWritable { private phoneInfo phoneinfo; public phoneInfoDBWritable() { } public phoneInfoDBWritable(phoneInfo phoneinfo) { this.phoneinfo = phoneinfo; } public void write(PreparedStatement statement) throws SQLException { statement.setString(1, phoneinfo.getCaller()); statement.setString(2, phoneinfo.getCallee()); statement.setString(3, phoneinfo.getTime()); statement.setString(4, phoneinfo.getDur()); statement.setString(5, phoneinfo.getFlag()); } public void readFields(ResultSet resultSet) throws SQLException { } }