1. 程式人生 > >spark load data from mysql

spark load data from mysql

word scan 類型 ide type 數據 mat 獲取 set

spark load data from mysql

code first

本機通過spark-shell.cmd啟動一個spark進程

    SparkSession spark = SparkSession.builder().appName("Simple Application").master("local[2]").getOrCreate();

        Map<String, String> map = new HashMap<>();
        map.put("url","jdbc:mysql:xxx");
        map.put("user", "user");
        map.put("password", "pass");
        String tableName = "table";
        map.put("dbtable", tableName);
        map.put("driver", "com.mysql.jdbc.Driver");
        String lowerBound = 1 + "";   //低界限
        String upperBound = 10000 + "";  //高界限

        map.put("fetchsize", "100000");  //實例和mysql服務端單次拉取行數,拉取後才能執行rs.next()
        map.put("numPartitions", "50");  //50個分區區間,將以範圍[lowerBound,upperBound]劃分成50個分區,每個分區執行一次查詢
        map.put("partitionColumn", "id");  //分區條件列
        System.out.println("tableName:" + tableName + ", lowerBound:"+lowerBound+", upperBound:"+upperBound);
        map.put("lowerBound", lowerBound);
        map.put("upperBound", upperBound);

        Dataset dataset = spark.read().format("jdbc").options(map).load(); //transform操作
        dataset.registerTempTable("tmp__");
        Dataset<Row> ds = spark.sql("select * from tmp__"); //transform操作
        ds.cache().show();  //action,觸發sql真正執行

執行到show時,任務開始真正執行,此時,我們單機debug,來跟蹤partitionColumn的最終實現方式

debug類

org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.buildScan

此時parts為size=50的分區列表

  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
    JDBCRDD.scanTable(
      sparkSession.sparkContext,
      schema,
      requiredColumns,
      filters,
      parts,
      jdbcOptions).asInstanceOf[RDD[Row]]
  }

單個分區內的whereClause值

whereCluase="id < 21 or id is null" 

技術分享圖片

繼續往下斷點,到單個part的執行邏輯,此時代碼應該是在Executor中的某個task線程中
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute

    val myWhereClause = getWhereClause(part)

    val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
    stmt = conn.prepareStatement(sqlText,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    stmt.setFetchSize(options.fetchSize)
    rs = stmt.executeQuery()
    val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

    CompletionIterator[InternalRow, Iterator[InternalRow]](
      new InterruptibleIterator(context, rowsIterator), close())    
      

此時
myWhereClause=WHERE id < 21 or id is null

最終的sql語句
sqlText=SELECT id,xx FROM tablea WHERE id < 21 or id is null

技術分享圖片

所有part都會經過compute
Executor執行完任務後,將信息發送回Driver
Executor: Finished task 7.0 in stage 2.0 (TID 12). 1836 bytes result sent to driver

總結

  • numPartitions、partitionColumn、lowerBound、upperBound結合後,spark將生成很多個parts,每個part對應一個查詢whereClause,最終查詢數據將分成numPartitions個任務來拉取數據,因此,partitionColumn必須是索引列,否則,效率將大大降低
  • 自動獲取table schema,程序會執行類型select * from tablea where 1=0 來獲取字段及類型

spark load data from mysql