1. 程式人生 > >spark sql自己定義規則

spark sql自己定義規則

1) github 下載spark 原始碼(下面的程式碼都是需要新增的,找到對應的檔案進行新增即可)

2)找到 SqlBase.g4 檔案,以 offset 為例進行說明,下面的

   2.1)找到下面的內容

queryOrganization
    : (ORDER BY order+=sortItem (',' order+=sortItem)*)?
      (CLUSTER BY clusterBy+=expression (',' clusterBy+=expression)*)?
      (DISTRIBUTE BY distributeBy+=expression (',' distributeBy+=expression)*)?
      (SORT BY sort+=sortItem (',' sort+=sortItem)*)?
      windows?
      (OFFSET offset=expression)?    此處是新增的功能,類似於Oracle裡面的關鍵字
(LIMIT (ALL | limit=expression))? ;

  2.2)1  package org.apache.spark.sql.execution  limit

/**
    * Offset 編輯資訊
    * @param offset
    * @param limit
    * @param child
    */
  case class OffsetExec(offset:Int,limit: Int, child: SparkPlan) extends UnaryExecNode {

    override def output: Seq[Attribute] = child.output

    override def outputPartitioning: Partitioning = SinglePartition

    override def executeCollect(): Array[InternalRow] = child.executeTake(offset + limit).drop(offset)

    private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

    protected override def doExecute(): RDD[InternalRow] = {
      val locallyLimited = child.execute().mapPartitionsInternal(_.take(offset + limit))
      val shuffled = new ShuffledRowRDD(
        ShuffleExchangeExec.prepareShuffleDependency(
          locallyLimited, child.output, SinglePartition, serializer))
      shuffled.mapPartitionsInternal(_.take(offset + limit).drop(offset))
    }
  }

/**
    * limit 排序
    * @param offset
    * @param limit
    * @param sortOrder
    * @param projectList
    * @param child
    */
  case class SortOffsetLimitAndProjectExec(
                                            offset: Int,
                                        limit: Int,
                                        sortOrder: Seq[SortOrder],
                                        projectList: Seq[NamedExpression],
                                        child: SparkPlan) extends UnaryExecNode {

    override def output: Seq[Attribute] = {
      projectList.map(_.toAttribute)
    }

    override def executeCollect(): Array[InternalRow] = {
      val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
      val data = child.execute().map(_.copy()).takeOrdered(offset + limit).drop(offset)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        data.map(r => proj(r).copy())
      } else {
        data
      }
    }

    private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

    protected override def doExecute(): RDD[InternalRow] = {
      val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
      val localTopK: RDD[InternalRow] = {
        child.execute().map(_.copy()).mapPartitions { iter =>
          org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
        }
      }
      val shuffled = new ShuffledRowRDD(
        ShuffleExchangeExec.prepareShuffleDependency(
          localTopK, child.output, SinglePartition, serializer))
      shuffled.mapPartitions { iter =>
        val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), offset+limit).drop(offset)(ord)
        if (projectList != child.output) {
          val proj = UnsafeProjection.create(projectList, child.output)
          topK.map(r => proj(r))
        } else {
          topK
        }
      }
    }

    override def outputOrdering: Seq[SortOrder] = sortOrder

    override def outputPartitioning: Partitioning = SinglePartition

    override def simpleString: String = {
      val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
      val outputString = Utils.truncatedString(output, "[", ",", "]")

      s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
    }

 2.3)org.apache.spark.sql.catalyst.plans.logical.baseLogicalOperators

/**
  * 基本物理邏輯處理
  * @param offsetExpr
  * @param limitExpr
  * @param child
  */
case class Offset(offsetExpr: Expression,limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
}

2.4 org.apache.spark.sql.catalyst.parser.AstBuilder        withQueryResultClauses:


    //Offset
    var withOffset:LogicalPlan = null
    if(limit != null){
      withOffset = withWindow.optional(offset){
        Offset(typedVisit(offset),typedVisit(limit))
      }
    }else{
      withOffset = withWindow.optional(offset){
        Offset(typedVisit(offset),typedVisit(offset))
      }
    }
    
    
    
   withOffset.optional(limit) {
      Limit(typedVisit(limit), withOffset)
    }