1. 程式人生 > 實用技巧 >手把手教你AspNetCore WebApi:認證與授權

手把手教你AspNetCore WebApi:認證與授權

表值聚合函式

自定義表值聚合函式(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結果中可以有多行多列。

上圖展示了一個表值聚合函式的例子。假設你有一個飲料的表,這個表有 3 列,分別是idnameprice,一共有 5 行。假設你需要找到價格最高的兩個飲料,類似於top2()表值聚合函式。你需要遍歷所有 5 行資料,結果是有 2 行資料的一個表。

使用者自定義表值聚合函式是通過擴充套件TableAggregateFunction類來實現的。一個TableAggregateFunction的工作過程如下。首先,它需要一個accumulator,這個accumulator

負責儲存聚合的中間結果。 通過呼叫TableAggregateFunctioncreateAccumulator方法來構造一個空的 accumulator。接下來,對於每一行資料,會呼叫accumulate方法來更新 accumulator。當所有資料都處理完之後,呼叫emitValue方法來計算和返回最終的結果。

下面幾個TableAggregateFunction的方法是必須要實現的:

  • createAccumulator()
  • accumulate()

Flink 的型別推導在遇到複雜型別的時候可能會推匯出錯誤的結果,比如那些非基本型別和普通的 POJO 型別的複雜型別。所以類似於ScalarFunction

TableFunctionTableAggregateFunction也提供了TableAggregateFunction#getResultType()TableAggregateFunction#getAccumulatorType()方法來指定返回值型別和 accumulator 的型別,這兩個方法都需要返回TypeInformation

除了上面的方法,還有幾個其他的方法可以選擇性的實現。有些方法可以讓查詢更加高效,而有些方法對於某些特定場景是必須要實現的。比如,在會話視窗(當兩個會話視窗合併時會合並兩個 accumulator)中使用聚合函式時,必須要實現merge()方法。

下面幾個TableAggregateFunction的方法在某些特定場景下是必須要實現的:

  • retract()在 boundedOVER視窗中的聚合函式必須要實現。
  • merge()在許多批式聚合和會話視窗聚合中是必須要實現的。
  • resetAccumulator()在許多批式聚合中是必須要實現的。
  • emitValue()在批式聚合以及視窗聚合中是必須要實現的。

下面的TableAggregateFunction的方法可以提升流式任務的效率:

  • emitUpdateWithRetract()在 retract 模式下,該方法負責傳送被更新的值。

emitValue方法會發送所有 accumulator 給出的結果。拿 TopN 來說,emitValue每次都會發送所有的最大的 n 個值。這在流式任務中可能會有一些效能問題。為了提升效能,使用者可以實現emitUpdateWithRetract方法。這個方法在 retract 模式下會增量的輸出結果,比如有資料更新了,我們必須要撤回老的資料,然後再發送新的資料。如果定義了emitUpdateWithRetract方法,那它會優先於emitValue方法被使用,因為一般認為emitUpdateWithRetract會更加高效,因為它的輸出是增量的。

TableAggregateFunction的所有方法都必須是public的、非static的,而且名字必須跟上面提到的一樣。createAccumulatorgetResultTypegetAccumulatorType這三個方法是在抽象父類TableAggregateFunction中定義的,而其他的方法都是約定的方法。要實現一個表值聚合函式,你必須擴充套件org.apache.flink.table.functions.TableAggregateFunction,並且實現一個(或者多個)accumulate方法。accumulate方法可以有多個過載的方法,也可以支援變長引數。

TableAggregateFunction的所有方法的詳細文件如下。

/**
  * Base class for user-defined aggregates and table aggregates.
  *
  * @tparam T   the type of the aggregation result.
  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  */
abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction {

  /**
    * Creates and init the Accumulator for this (table)aggregate function.
    *
    * @return the accumulator with the initial value
    */
  def createAccumulator(): ACC // MANDATORY

  /**
    * Returns the TypeInformation of the (table)aggregate function's result.
    *
    * @return The TypeInformation of the (table)aggregate function's result or null if the result
    *         type should be automatically inferred.
    */
  def getResultType: TypeInformation[T] = null // PRE-DEFINED

  /**
    * Returns the TypeInformation of the (table)aggregate function's accumulator.
    *
    * @return The TypeInformation of the (table)aggregate function's accumulator or null if the
    *         accumulator type should be automatically inferred.
    */
  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
}

/**
  * Base class for table aggregation functions.
  *
  * @tparam T   the type of the aggregation result
  * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
  *             aggregated values which are needed to compute an aggregation result.
  *             TableAggregateFunction represents its state using accumulator, thereby the state of
  *             the TableAggregateFunction must be put into the accumulator.
  */
abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] {

  /**
    * Processes the input values and update the provided accumulator instance. The method
    * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
    * requires at least one accumulate() method.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY

  /**
    * Retracts the input values from the accumulator instance. The current design assumes the
    * inputs are the values that have been previously accumulated. The method retract can be
    * overloaded with different custom types and arguments. This function must be implemented for
    * datastream bounded over aggregate.
    *
    * @param accumulator           the accumulator which contains the current aggregated results
    * @param [user defined inputs] the input value (usually obtained from a new arrived data).
    */
  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL

  /**
    * Merges a group of accumulator instances into one accumulator instance. This function must be
    * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
    *
    * @param accumulator  the accumulator which will keep the merged aggregate results. It should
    *                     be noted that the accumulator may contain the previous aggregated
    *                     results. Therefore user should not replace or clean this instance in the
    *                     custom merge method.
    * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
    *                     merged.
    */
  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL

  /**
    * Called every time when an aggregation result should be materialized. The returned value
    * could be either an early and incomplete result  (periodically emitted as data arrive) or
    * the final result of the  aggregation.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @param out         the collector used to output data
    */
  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL

  /**
    * Called every time when an aggregation result should be materialized. The returned value
    * could be either an early and incomplete result (periodically emitted as data arrive) or
    * the final result of the aggregation.
    *
    * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
    * This method outputs data incrementally in retract mode, i.e., once there is an update, we
    * have to retract old records before sending new updated ones. The emitUpdateWithRetract
    * method will be used in preference to the emitValue method if both methods are defined in the
    * table aggregate function, because the method is treated to be more efficient than emitValue
    * as it can outputvalues incrementally.
    *
    * @param accumulator the accumulator which contains the current
    *                    aggregated results
    * @param out         the retractable collector used to output data. Use collect method
    *                    to output(add) records and use retract method to retract(delete)
    *                    records.
    */
  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL

  /**
    * Collects a record and forwards it. The collector can output retract messages with the retract
    * method. Note: only use it in `emitRetractValueIncrementally`.
    */
  trait RetractableCollector[T] extends Collector[T] {

    /**
      * Retract a record.
      *
      * @param record The record to retract.
      */
    def retract(record: T): Unit
  }
}
View Code

下面的例子展示瞭如何

  • 定義一個TableAggregateFunction來計算給定列的最大的 2 個值,
  • TableEnvironment中註冊函式,
  • 在 Table API 查詢中使用函式(當前只在 Table API 中支援 TableAggregateFunction)。

為了計算最大的 2 個值,accumulator 需要儲存當前看到的最大的 2 個值。在我們的例子中,我們定義了類Top2Accum來作為 accumulator。Flink 的 checkpoint 機制會自動儲存 accumulator,並且在失敗時進行恢復,來保證精確一次的語義。

我們的Top2表值聚合函式(TableAggregateFunction)的accumulate()方法有兩個輸入,第一個是Top2Accumaccumulator,另一個是使用者定義的輸入:輸入的值v。儘管merge()方法在大多數聚合型別中不是必須的,我們也在樣例中提供了它的實現。請注意,我們在 Scala 樣例中也使用的是 Java 的基礎型別,並且定義了getResultType()getAccumulatorType()方法,因為 Flink 的型別推導對於 Scala 的型別推導支援的不是很好。

import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction

/**
 * Accumulator for top2.
 */
class Top2Accum {
  var first: JInteger = _
  var second: JInteger = _
}

/**
 * The top2 user-defined table aggregate function.
 */
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {

  override def createAccumulator(): Top2Accum = {
    val acc = new Top2Accum
    acc.first = Int.MinValue
    acc.second = Int.MinValue
    acc
  }

  def accumulate(acc: Top2Accum, v: Int) {
    if (v > acc.first) {
      acc.second = acc.first
      acc.first = v
    } else if (v > acc.second) {
      acc.second = v
    }
  }

  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      val top2 = iter.next()
      accumulate(acc, top2.first)
      accumulate(acc, top2.second)
    }
  }

  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = {
    // emit the value and rank
    if (acc.first != Int.MinValue) {
      out.collect(JTuple2.of(acc.first, 1))
    }
    if (acc.second != Int.MinValue) {
      out.collect(JTuple2.of(acc.second, 2))
    }
  }
}

// 初始化表
val tab = ...

// 使用函式
tab
  .groupBy('key)
  .flatAggregate(top2('a) as ('v, 'rank))
  .select('key, 'v, 'rank)

下面的例子展示瞭如何使用emitUpdateWithRetract方法來只發送更新的資料。為了只發送更新的結果,accumulator 儲存了上一次的最大的2個值,也儲存了當前最大的2個值。注意:如果 TopN 中的 n 非常大,這種既儲存上次的結果,也儲存當前的結果的方式不太高效。一種解決這種問題的方式是把輸入資料直接儲存到accumulator中,然後在呼叫emitUpdateWithRetract方法時再進行計算。

import java.lang.{Integer => JInteger}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableAggregateFunction

/**
 * Accumulator for top2.
 */
class Top2Accum {
  var first: JInteger = _
  var second: JInteger = _
  var oldFirst: JInteger = _
  var oldSecond: JInteger = _
}

/**
 * The top2 user-defined table aggregate function.
 */
class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] {

  override def createAccumulator(): Top2Accum = {
    val acc = new Top2Accum
    acc.first = Int.MinValue
    acc.second = Int.MinValue
    acc.oldFirst = Int.MinValue
    acc.oldSecond = Int.MinValue
    acc
  }

  def accumulate(acc: Top2Accum, v: Int) {
    if (v > acc.first) {
      acc.second = acc.first
      acc.first = v
    } else if (v > acc.second) {
      acc.second = v
    }
  }

  def emitUpdateWithRetract(
    acc: Top2Accum,
    out: RetractableCollector[JTuple2[JInteger, JInteger]])
  : Unit = {
    if (acc.first != acc.oldFirst) {
      // if there is an update, retract old value then emit new value.
      if (acc.oldFirst != Int.MinValue) {
        out.retract(JTuple2.of(acc.oldFirst, 1))
      }
      out.collect(JTuple2.of(acc.first, 1))
      acc.oldFirst = acc.first
    }
    if (acc.second != acc.oldSecond) {
      // if there is an update, retract old value then emit new value.
      if (acc.oldSecond != Int.MinValue) {
        out.retract(JTuple2.of(acc.oldSecond, 2))
      }
      out.collect(JTuple2.of(acc.second, 2))
      acc.oldSecond = acc.second
    }
  }
}

// 初始化表
val tab = ...

// 使用函式
tab
  .groupBy('key)
  .flatAggregate(top2('a) as ('v, 'rank))
  .select('key, 'v, 'rank)