spark load data from mysql
阿新 • • 發佈:2019-05-13
word scan 類型 ide type 數據 mat 獲取 set
spark load data from mysql
code first
本機通過spark-shell.cmd啟動一個spark進程
SparkSession spark = SparkSession.builder().appName("Simple Application").master("local[2]").getOrCreate(); Map<String, String> map = new HashMap<>(); map.put("url","jdbc:mysql:xxx"); map.put("user", "user"); map.put("password", "pass"); String tableName = "table"; map.put("dbtable", tableName); map.put("driver", "com.mysql.jdbc.Driver"); String lowerBound = 1 + ""; //低界限 String upperBound = 10000 + ""; //高界限 map.put("fetchsize", "100000"); //實例和mysql服務端單次拉取行數,拉取後才能執行rs.next() map.put("numPartitions", "50"); //50個分區區間,將以範圍[lowerBound,upperBound]劃分成50個分區,每個分區執行一次查詢 map.put("partitionColumn", "id"); //分區條件列 System.out.println("tableName:" + tableName + ", lowerBound:"+lowerBound+", upperBound:"+upperBound); map.put("lowerBound", lowerBound); map.put("upperBound", upperBound); Dataset dataset = spark.read().format("jdbc").options(map).load(); //transform操作 dataset.registerTempTable("tmp__"); Dataset<Row> ds = spark.sql("select * from tmp__"); //transform操作 ds.cache().show(); //action,觸發sql真正執行
執行到show時,任務開始真正執行,此時,我們單機debug,來跟蹤partitionColumn的最終實現方式
debug類
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.buildScan
此時parts為size=50的分區列表
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row] JDBCRDD.scanTable( sparkSession.sparkContext, schema, requiredColumns, filters, parts, jdbcOptions).asInstanceOf[RDD[Row]] }
單個分區內的whereClause值
whereCluase="id < 21 or id is null"
繼續往下斷點,到單個part的執行邏輯,此時代碼應該是在Executor中的某個task線程中
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute
val myWhereClause = getWhereClause(part) val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause" stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setFetchSize(options.fetchSize) rs = stmt.executeQuery() val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics) CompletionIterator[InternalRow, Iterator[InternalRow]]( new InterruptibleIterator(context, rowsIterator), close())
此時
myWhereClause=WHERE id < 21 or id is null
最終的sql語句
sqlText=SELECT id
,xx
FROM tablea WHERE id < 21 or id is null
所有part都會經過compute
Executor執行完任務後,將信息發送回Driver
Executor: Finished task 7.0 in stage 2.0 (TID 12). 1836 bytes result sent to driver
總結
- numPartitions、partitionColumn、lowerBound、upperBound結合後,spark將生成很多個parts,每個part對應一個查詢whereClause,最終查詢數據將分成numPartitions個任務來拉取數據,因此,partitionColumn必須是索引列,否則,效率將大大降低
- 自動獲取table schema,程序會執行類型select * from tablea where 1=0 來獲取字段及類型
spark load data from mysql