Spark 新增複用JDBC Schema功能
1) 修改原因
使用者需要閱讀大量相同的資料庫表,比如相同schema的表有1000張(比如對mysql進行分表分庫)需要全讀,每次建立dataframe的時候需要通過jdbcrelation去查詢每一張表的schema,需要消耗了大量時間。本文對提出一種修改辦法,如果使用者知道表的sechema相同,可以使用sechema複用功能。
2) 程式碼流程
val df = sqlContext.read().format("jdbc").options(dfOptions).load();
->resolved= ResolvedDataSource(
sqlContext,
userSpecifiedSchema =userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap) //解析資料來源,獲取jdbc、parquet、josn的schema引數
->dataSource.createRelation(sqlContext,new CaseInsensitiveMap(options)) //傳入options
->JDBCRelation(url, table, parts, properties)(sqlContext) //獲取jdbc的relation
->override val schema= JDBCRDD.resolveTable(url, table, properties) //獲取schema
->conn.prepareStatement(s"SELECT * FROM $table WHERE1=0").executeQuery() //直接讀取database,需要優化
3) 修改方法
在使用者知道schema的情況下,沒有必要重複獲取schema;
使用者定義是否需要重複使用schema,修改程式碼流程最小;
修改方法:
a) 使用者通過Options傳入需要複用schema的開關:
dfOptions.put("jdbcschemakey","sparkourtest");
b) 建立一個hashtable,儲存已經獲取的shema
val schemaHashTable= newjava.util.HashMap[String,StructType]()
c) schema獲取流程:
4) 修改程式碼
29a30 > import org.apache.spark.Logging 40c41 < private[sql] object JDBCRelation { --- > private[sql] object JDBCRelation{ 48a50,55 > > > // add by Ricky for get same table schema > > val schemaHashTable= new java.util.HashMap[String,StructType]() > 117c124 < private[sql] case class JDBCRelation( --- > private[sql] case class JDBCRelation ( 124c131 < with InsertableRelation { --- > with InsertableRelation with Logging{ 128c135,160 < override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) --- > > // add by Ricky for get same table schema > def getSchema():StructType={ > //val schemaKey = properties.getProperty("jdbcSchemaKey") > val schemaKey = properties.getProperty("jdbcschemakey") > if (schemaKey != null) { > val schemaStored = JDBCRelation.schemaHashTable.get(schemaKey) > if (schemaStored != null) { > schemaStored > } else { > val schemaStored = JDBCRDD.resolveTable(url, table, properties) > logInfo("schemaKey configed,schemaHashTable empty,now put "+schemaKey.toString) > JDBCRelation.schemaHashTable.put(schemaKey, schemaStored) > schemaStored > } > } > else > { > JDBCRDD.resolveTable(url, table, properties) > } > > } > > override val schema: StructType = getSchema() > // end by Ricky > // override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) |