Two ways to load mysql tables into hdfs via spark
阿新 • • 發佈:2019-01-26
Load mysql tables: SQLContext.load and save table with parquet format
SQLContext way is also based on JDBCRDD, just spark provide more parquet support in SqlContext.
package org.apache.spark.examples.sql
import org.apache.spark.sql.SQLContext
import java.sql.{ Connection, DriverManager, ResultSet }
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{ SparkConf, SparkContext }
import java.util.HashMap
import org.apache.spark.api.java.JavaSparkContext
/**
* @author ChenFangFang
*/
object LoadFromMysql_SqlContext {
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage LoadFromMysql_SqlContext <url> <username> <password> <table> <id> <output>")
System.exit(1)
}
val Array(url, username, password, table, id, output) = args
val sparkConf = new SparkConf().setAppName("SqlKeywordCount")
val lines_each_part = 2000000 //row lines each part file include
Class.forName("com.mysql.jdbc.Driver").newInstance
val connection = DriverManager.getConnection(url, username, password)
// for partitions, get lower_bound and upper_bound
......
val sc = new JavaSparkContext(new SparkConf().setAppName("LoadFromMysql"));
val sqlContext = new SQLContext(sc)
val url_total = url + "?user=" + username + "&password=" + password;
var options: HashMap[String, String] = new HashMap
options.put("driver", "com.mysql.jdbc.Driver")
options.put("url", url_total)
options.put("dbtable", table)
options.put("lowerBound", lower_bound.toString())
options.put("upperBound", upper_bound.toString())
options.put("numPartitions", partitions.toString());
options.put("partitionColumn", id);
val jdbcDF = sqlContext.load("jdbc", options)
jdbcDF.save(output)
}
}
Process data: use spark-shell directely in parquet way
SQLContext way is also based on JDBCRDD, just spark provide more parquet support in SqlContext.
package org.apache.spark.examples.sql
import org.apache.spark.sql.SQLContext
import java.sql.{ Connection, DriverManager, ResultSet }
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{ SparkConf, SparkContext }
import java.util.HashMap
import org.apache.spark.api.java.JavaSparkContext
/**
* @author ChenFangFang
*/
object LoadFromMysql_SqlContext {
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage LoadFromMysql_SqlContext <url> <username> <password> <table> <id> <output>")
System.exit(1)
}
val Array(url, username, password, table, id, output) = args
val sparkConf = new SparkConf().setAppName("SqlKeywordCount")
val lines_each_part = 2000000 //row lines each part file include
Class.forName("com.mysql.jdbc.Driver").newInstance
val connection = DriverManager.getConnection(url, username, password)
// for partitions, get lower_bound and upper_bound
......
val sc = new JavaSparkContext(new SparkConf().setAppName("LoadFromMysql"));
val sqlContext = new SQLContext(sc)
val url_total = url + "?user=" + username + "&password=" + password;
var options: HashMap[String, String] = new HashMap
options.put("driver", "com.mysql.jdbc.Driver")
options.put("url", url_total)
options.put("dbtable", table)
options.put("lowerBound", lower_bound.toString())
options.put("upperBound", upper_bound.toString())
options.put("numPartitions", partitions.toString());
options.put("partitionColumn", id);
val jdbcDF = sqlContext.load("jdbc", options)
jdbcDF.save(output)
}
}
Process data: use spark-shell directely in parquet way