mapreduce 將hdfs資料逐行寫入mysql
阿新 • • 發佈:2019-01-25
code
package hdfsToSQL;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop .io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser ;
public class hdfsToSQL {
static String driver = "com.mysql.jdbc.Driver";
// static String url = "jdbc:mysql://192.168.1.58:3306/powerloaddata?user=dbuser&password=lfmysql";
static String url = "jdbc:mysql://master:3306/test?user=root";
static Connection conn = null;
static Statement stmt = null;
static ResultSet rs = null;
public static class hdfsToSQLMapper extends Mapper<Object, Text, Text, IntWritable>{
public void map(Object key , Text value, Context context) throws IOException, InterruptedException {
// get lines
String line = value.toString();
String [] words = line.split(",");
if (words.length == 3){
try {
// write sql
Class.forName(driver);
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
String sql = "insert into DataPowerPrediction values("+words[0]+","+words[1]+","+words[2]+")";
stmt.executeUpdate(sql);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
try {
conn.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "hdfsToSQL");
job.setJarByClass(hdfsToSQL.class);
job.setMapperClass(hdfsToSQLMapper.class);
// job.setCombinerClass(IntSumReducer.class);
// job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
執行程式碼
/usr/hadoop/bin/hadoop jar hdfsToSQL.jar hdfsToSQL.hdfsToSQL hdfs://master:9000/user/root/data/foreastdatatest.csv hdfs://master:9000/user/root/output/hdfsToSQL4
結果
15/08/27 02:02:08 INFO mapreduce.Job: map 19% reduce 0%
15/08/27 02:02:11 INFO mapreduce.Job: map 33% reduce 0%
15/08/27 02:02:14 INFO mapreduce.Job: map 47% reduce 0%
15/08/27 02:02:17 INFO mapreduce.Job: map 62% reduce 0%
15/08/27 02:02:19 INFO mapreduce.Job: map 100% reduce 0%
15/08/27 02:02:24 INFO mapreduce.Job: map 100% reduce 100%
15/08/27 02:02:24 INFO mapreduce.Job: Job job_1440638983382_0001 completed successfully