1. 程式人生 > >mapreduce 將hdfs資料逐行寫入mysql

mapreduce 將hdfs資料逐行寫入mysql

code

package hdfsToSQL;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop
.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser
; public class hdfsToSQL { static String driver = "com.mysql.jdbc.Driver"; // static String url = "jdbc:mysql://192.168.1.58:3306/powerloaddata?user=dbuser&password=lfmysql"; static String url = "jdbc:mysql://master:3306/test?user=root"; static Connection conn = null; static Statement stmt = null;
static ResultSet rs = null; public static class hdfsToSQLMapper extends Mapper<Object, Text, Text, IntWritable>{ public void map(Object key , Text value, Context context) throws IOException, InterruptedException { // get lines String line = value.toString(); String [] words = line.split(","); if (words.length == 3){ try { // write sql Class.forName(driver); conn = DriverManager.getConnection(url); stmt = conn.createStatement(); String sql = "insert into DataPowerPrediction values("+words[0]+","+words[1]+","+words[2]+")"; stmt.executeUpdate(sql); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { try { conn.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "hdfsToSQL"); job.setJarByClass(hdfsToSQL.class); job.setMapperClass(hdfsToSQLMapper.class); // job.setCombinerClass(IntSumReducer.class); // job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

執行程式碼

/usr/hadoop/bin/hadoop jar hdfsToSQL.jar hdfsToSQL.hdfsToSQL hdfs://master:9000/user/root/data/foreastdatatest.csv hdfs://master:9000/user/root/output/hdfsToSQL4

結果

15/08/27 02:02:08 INFO mapreduce.Job:  map 19% reduce 0%
15/08/27 02:02:11 INFO mapreduce.Job:  map 33% reduce 0%
15/08/27 02:02:14 INFO mapreduce.Job:  map 47% reduce 0%
15/08/27 02:02:17 INFO mapreduce.Job:  map 62% reduce 0%
15/08/27 02:02:19 INFO mapreduce.Job:  map 100% reduce 0%
15/08/27 02:02:24 INFO mapreduce.Job:  map 100% reduce 100%
15/08/27 02:02:24 INFO mapreduce.Job: Job job_1440638983382_0001 completed successfully