spark2.x由淺入深深到底系列六之RDD java api用JdbcRDD讀取關系型數據庫
阿新 • • 發佈:2017-09-21
spark 大數據 javaapi rdd jdbcrdd
學習任何的spark技術之前,請先正確理解spark,可以參考:正確理解spark
以下是用spark RDD java api實現從關系型數據庫中讀取數據,這裏使用的是derby本地數據庫,當然可以是mysql或者oracle等關系型數據庫:
package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.JdbcRDD; import java.io.Serializable; import java.sql.*; public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地數據庫derby,當然可以使用mysql等關系型數據庫 Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //創建一張表FOO,ID是一個自增的主鍵,DATA是一個INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入數據 PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn‘t exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //準備數據 prepareData(); //構建JdbcRDD JavaRDD<Integer> rdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function<ResultSet, Integer>() { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //結果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); } }
詳細了解RDD的api的話,可以參考: spark core RDD api原理詳解
spark2.x由淺入深深到底系列六之RDD java api用JdbcRDD讀取關系型數據庫