使用 aspectj 對 spark 進行攔截
阿新 • • 發佈:2018-12-13
文章目錄
背景
開源產品要想用的得心應手免不了要根據公司的業務/場景對其做一些改造,如果直接在原始碼的層面對其修改,當下可能用的很省心,但後期與社群程式碼的合併,版本的升級的時候就相當糟心了。
對於一個平臺來說,使用者對技術本身是不敏感的,所以我們需要增加一些限制來減少叢集的一些不可控情況,例如不斷的寫入新表/新資料卻不記得刪除,大量不按規範建立的表名等情況。與此同時應儘量讓技術對使用者透明,比如讓其無感知的訪問多種型別的資料庫。
下文以攔截 spark.sql()
如何使用
程式碼
SparkSqlAspect.scala
為了便於理解以下程式碼會進行一些刪減
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{Around, Aspect}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{Dataset, Row, SparkSession, TiContext}
import cn.tongdun.datacompute.parser._
import cn.tongdun.datacompute.parser.spark.SparkSQLHelper
@Aspect
class SparkSqlAspect {
private val logger = LoggerFactory.getLogger(classOf[SparkSqlAspect])
private var tiContext: TiContext = null
@Around("execution(public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sqlRaw)" )
def around(pjp: ProceedingJoinPoint,
sqlRaw: String): Dataset[Row] = {
//sparkSQLHelper 是我們基於 antlr4 增加了一些 sparksql 語法的支援,例如建表時需要指定 lifecycle 等
val sql = SparkSQLHelper.format(sqlRaw)
val spark = pjp.getThis.asInstanceOf[SparkSession]
var dataset: Dataset[Row] = spark.emptyDataFrame
val statementData = SparkSQLHelper.getStatementData(sql)
val statement = statementData.getStatement()
//getType 方法用於獲取sql的型別
statementData.getType match {
case StatementType.CREATE_TABLE =>
createMethod()
case StatementType.CREATE_TABLE_AS_SELECT =>
createAsSelectMethod()
case StatementType.SELECT =>
dataset = selectMethod(spark, inputSql, statement, pjp)
case _ =>
dataset = pjp.proceed(pjp.getArgs).asInstanceOf[Dataset[Row]]
}
dateset
}
// 建表必須帶有 lifecycle 欄位,並對錶名進行校驗,將相關資訊註冊到元資料系統等操作
def createMethod(): Unit = {
...
}
// 約定 create table as select 生成的表都為中間表,必須以 tdl_ 開頭,lifecycle 固定為7天
def createAsSelectMethod(): Unit = {
...
}
// select 對多個數據庫源進行判定以及對許可權進行校驗,下面以tidb為例
def selectMethod(spark: SparkSession,
inputSql: String,
statement: Statement,
pjp: ProceedingJoinPoint): Dataset[Row] = {
val tableData = statement.asInstanceOf[TableData]
//獲取所有需要訪問的源表
tableData.getInputTables.toArray.foreach {
case t: TableSource =>
val databaseName = t.getDatabaseName
val tableName = t.getTableName
val fullTableName = databaseName + "." + tableName
//所有tidb的庫都以tidb為字首
if (t.getDatabaseName.startsWith("tidb")) {
//對tidb表許可權進行校驗
if(tableAuthCheck(...)){
//判斷tiContext是否初始化
if (tiContext == null) {
tiContext = new TiContext(spark)
}
//對tidb表的表名進行替換,避免與現有的臨時表/中間表衝突
val replacedTable = "tdl_" + databaseName + "_" + tableName
//加入tidb表資料來源
tiContext.tidbMapTable(databaseName, tableName)
//註冊為臨時表
tiContext.getDataFrame(databaseName, tableName).createOrReplaceTempView(replacedTable)
//將sql語句中的表名進行替換
sql = StringUtils.replace(sql, fullTableName, replacedTable)
} else {
throw new IllegalAccessError(fullTableName + "沒有訪問許可權")
}
}
case _ =>
}
pjp.proceed(Array(sql)).asInstanceOf[Dataset[Row]]
}
}
配置
pom.xml
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.9.1</version>
</dependency>
<!--公司內部版本,用於支援spark2.3-->
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-core</artifactId>
<version>1.1-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
resources/META-INF/AspectSql.aj
<?xml version="1.0" encoding="UTF-8" ?>
<aspectj>
<aspects>
<aspect name="cn.tongdun.aspectj.SparkSqlAspect"/>
</aspects>
<weaver options="-Xset:weaveJavaxPackages=true"/>
</aspectj>
spark-defaults.conf
spark.driver.extraClassPath /path/to/spark-aspectj.jar
spark.driver.extraJavaOptions -javaagent:/home/admin/aspectjweaver-1.9.1.jar
結語
通過上述的操作,在使用者呼叫 spark.sql(...)
時將會觸發相應的方法。hdfs/rdd/sparkSession/etc.
操作同理可實現。
不同公司面臨的真實場景各有不同,因此並沒有過多的實現細節,僅給需要的同學提供一些思路。