1. 程式人生 > >Two ways to load mysql tables into hdfs via spark

Two ways to load mysql tables into hdfs via spark

Load mysql tables: SQLContext.load and save table with parquet format
SQLContext way is also based on JDBCRDD, just spark provide more parquet support in SqlContext. 
package org.apache.spark.examples.sql

import org.apache.spark.sql.SQLContext
import java.sql.{ Connection, DriverManager, ResultSet }
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{ SparkConf, SparkContext }
import java.util.HashMap
import org.apache.spark.api.java.JavaSparkContext

/**
* @author ChenFangFang
*/
object LoadFromMysql_SqlContext {

  def main(args: Array[String]) {
    if (args.length != 6) {
      System.err.println("Usage LoadFromMysql_SqlContext <url> <username> <password> <table> <id> <output>")
      System.exit(1)
    }

    val Array(url, username, password, table, id, output) = args
    val sparkConf = new SparkConf().setAppName("SqlKeywordCount")
    val lines_each_part = 2000000 //row lines each part file include

    Class.forName("com.mysql.jdbc.Driver").newInstance

    val connection = DriverManager.getConnection(url, username, password)

    // for partitions, get lower_bound and upper_bound
...... 

    val sc = new JavaSparkContext(new SparkConf().setAppName("LoadFromMysql"));
    val sqlContext = new SQLContext(sc)
    val url_total = url + "?user=" + username + "&password=" + password;

    var options: HashMap[String, String] = new HashMap
    options.put("driver", "com.mysql.jdbc.Driver")
    options.put("url", url_total)
    options.put("dbtable", table) 
    options.put("lowerBound", lower_bound.toString())
    options.put("upperBound", upper_bound.toString())
    options.put("numPartitions", partitions.toString());
    options.put("partitionColumn", id);

    val jdbcDF = sqlContext.load("jdbc", options)
    jdbcDF.save(output)
  }
}
Process data:  use spark-shell directely in parquet way