sparksql通過jdbc讀取mysql時劃分分割槽問題
阿新 • • 發佈:2018-12-20
當通過spark讀取mysql時,如果資料量比較大,為了加快速度,通常會起多個task並行拉取mysql資料。 其中一個api是
def
jdbc(url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame
引數 | 說明 |
---|---|
url | 訪問mysql時的jdbc連結,如jdbc:mysql://190.1.98.225:2049/test |
table | 訪問的表 |
columnName | 用於分割槽的列,必須是數字型別 |
lowerBound | 分割槽列的最小值 |
upperBound | 分割槽列的最大值 |
numPartitions | 預期的分割槽數 |
connectionProperties | mysql的配置引數,key value形式 |
這裡面容易引起混淆的是lowerBound和upperBound。需要注意的是lowerBound和upperBound僅用於決定劃分分割槽時的步長,而不是用於按照這兩個值對資料進行過濾。 因此,無論這兩個值如何設定,表中的所有行都將被讀取。
同時需要注意的是,儘量不要建立太多分割槽,否則很容易將mysql搞掛。
關於具體的分割槽,我寫了個示例程式碼,參考如下(本部分程式碼參考spark原始碼org.apache.spark.sql.execution.datasources.jdbc中columnPartition方法 )。
程式碼如下:
import scala.collection.mutable.ArrayBuffer
object PrintJdbcParition {
case class JDBCPartition(whereClause: String, partitionIndex: Int)
def main(args: Array[String]): Unit = {
val numPartitions = 10
val lowerBound = 100
val upperBound = 900
val column = "id"
// Overflow and silliness can happen if you subtract then divide.
// Here we get a little roundoff, but that's (hopefully) OK.
val stride: Long = (upperBound / numPartitions - lowerBound / numPartitions)
var i: Int = 0
var currentValue: Long = lowerBound
var ans = new ArrayBuffer[JDBCPartition]()
while (i < numPartitions) {
val lowerBound = if (i != 0) s"$column >= $currentValue" else null
currentValue += stride
val upperBound = if (i != numPartitions - 1) s"$column < $currentValue" else null
val whereClause =
if (upperBound == null) {
lowerBound
} else if (lowerBound == null) {
upperBound
} else {
s"$lowerBound AND $upperBound"
}
ans += JDBCPartition(whereClause, i)
i = i + 1
}
ans.toArray.map(println(_))
}
}
程式碼執行結果如下:
JDBCPartition(id < 180,0)
JDBCPartition(id >= 180 AND id < 260,1)
JDBCPartition(id >= 260 AND id < 340,2)
JDBCPartition(id >= 340 AND id < 420,3)
JDBCPartition(id >= 420 AND id < 500,4)
JDBCPartition(id >= 500 AND id < 580,5)
JDBCPartition(id >= 580 AND id < 660,6)
JDBCPartition(id >= 660 AND id < 740,7)
JDBCPartition(id >= 740 AND id < 820,8)
JDBCPartition(id >= 820,9)