1. 程式人生 > 實用技巧 >Spark讀取分析在ES中儲存的SQL

Spark讀取分析在ES中儲存的SQL

使用者通過elasticsearch-sql對儲存在elasticsearch中的資料進行查詢,假設事先會把查詢語句儲存在elasticsearch中,那麼如何對這些sql語句中涉及到的表進行統計?

Spark讀取Elasticsearch

import org.elasticsearch.spark._
val esOptions = Map("es.nodes"->"localhost", "es.port"->"9200","es.mapping.date.rich"->"false")
val esRDD = spark.sparkContext.esRDD("collectorapimetricslog2-2020.12/logs", esOptions)
esRDD.take(20).foreach(println)
val esJsonRDD = esRDD.map(x=>{
  import org.json4s._
  import org.json4s.JsonDSL._
  import org.json4s.jackson.JsonMethods._
  import org.json4s.jackson.Serialization
  import org.json4s.DefaultFormats
  implicit val json4sFormats = DefaultFormats
  val origM = x._2
  Serialization.writePretty(origM)
})
val esDF = spark.read.json(esRDD)

用RDD方式把query語句從es中讀取出來,轉換為json串之後,再轉換為DataFrame。

那為什麼不直接採用Elasticsearch-Hadoop中提供的Dataframe介面方式, 原因在於使用DataFrame方式直接讀取,會有多種格式不匹配或出錯的問題出現,elasticsearch-hadoop在相容性方面,還有許多細節考慮不周。

JSqlParser

使用JSqlParser把query語句中涉及到的表找出來

第一步, 載入jsqlparser庫

bin/spark-shell --packages "com.github.jsqlparser:jsqlparser:3.1"

第二步, 分析使用的程式碼,先去除識別上錯誤,然後parse

import net.sf.jsqlparser.util.TablesNamesFinder._
import net.sf.jsqlparser.util.TablesNamesFinder
import net.sf.jsqlparser.parser.CCJSqlParserUtil
import net.sf.jsqlparser.statement.select._
val stmt = CCJSqlParserUtil.parse("select * from tabl1 a join tab2 b on a.id=b.id")
val sel = stmt.asInstanceOf[Select]
val tblFinder = new TablesNamesFinder()
tblFinder.getTableList(sel)

val esQueryContentDF = esDF.filter("engine=='es'").select("queryContent")
val parsedQueryDF = esQueryContentDF.map(r => {
    import net.sf.jsqlparser.util.TablesNamesFinder._
    import net.sf.jsqlparser.util.TablesNamesFinder
    import net.sf.jsqlparser.parser.CCJSqlParserUtil
    import net.sf.jsqlparser.statement.select._
    import spark.implicits._
    import scala.collection.JavaConverters._
    var targetTable:String = "exception"
    val originalQuery = r.getString(0)
    try {
        val sQuery = r.getString(0)
        val dateHistoPattern = "date_histogram(?:.*[)])".r
        val sQuery2 = dateHistoPattern.replaceAllIn(sQuery,"date_histogram()")
        val qPattern = raw"(\w+-[\d.]+)".r
        val queryStr = qPattern.replaceAllIn(sQuery2,"`$1`")
        val stmt = CCJSqlParserUtil.parse(queryStr)
        val sel = stmt.asInstanceOf[Select]
        val tblNamesFinder = new TablesNamesFinder()
        val tblLst = tblNamesFinder.getTableList(sel)
        targetTable = tblLst.asScala.mkString(",")
    }catch {
        case ex: Exception => {
            targetTable = "exception: " + originalQuery
        }
    }
    targetTable
})

parsedQueryDF.filter(" value not like 'exception%'").createOrReplaceTempView("parsed_query")
spark.sql("select split(replace(value,'`',''),'-')[0] from parsed_query").distinct.collect.foreach(println)