linux下top命令詳解
表值聚合函式
自定義表值聚合函式(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結果中可以有多行多列。
上圖展示了一個表值聚合函式的例子。假設你有一個飲料的表,這個表有 3 列,分別是id
、name
和price
,一共有 5 行。假設你需要找到價格最高的兩個飲料,類似於top2()
表值聚合函式。你需要遍歷所有 5 行資料,結果是有 2 行資料的一個表。
使用者自定義表值聚合函式是通過擴充套件TableAggregateFunction
類來實現的。一個TableAggregateFunction
的工作過程如下。首先,它需要一個accumulator
,這個accumulator
TableAggregateFunction
的createAccumulator
方法來構造一個空的 accumulator。接下來,對於每一行資料,會呼叫accumulate
方法來更新 accumulator。當所有資料都處理完之後,呼叫emitValue
方法來計算和返回最終的結果。
下面幾個TableAggregateFunction
的方法是必須要實現的:
createAccumulator()
accumulate()
Flink 的型別推導在遇到複雜型別的時候可能會推匯出錯誤的結果,比如那些非基本型別和普通的 POJO 型別的複雜型別。所以類似於ScalarFunction
TableFunction
,TableAggregateFunction
也提供了TableAggregateFunction#getResultType()
和TableAggregateFunction#getAccumulatorType()
方法來指定返回值型別和 accumulator 的型別,這兩個方法都需要返回TypeInformation
。
除了上面的方法,還有幾個其他的方法可以選擇性的實現。有些方法可以讓查詢更加高效,而有些方法對於某些特定場景是必須要實現的。比如,在會話視窗(當兩個會話視窗合併時會合並兩個 accumulator)中使用聚合函式時,必須要實現merge()
方法。
下面幾個TableAggregateFunction
的方法在某些特定場景下是必須要實現的:
retract()
在 boundedOVER
視窗中的聚合函式必須要實現。merge()
在許多批式聚合和會話視窗聚合中是必須要實現的。resetAccumulator()
在許多批式聚合中是必須要實現的。emitValue()
在批式聚合以及視窗聚合中是必須要實現的。
下面的TableAggregateFunction
的方法可以提升流式任務的效率:
emitUpdateWithRetract()
在 retract 模式下,該方法負責傳送被更新的值。
emitValue
方法會發送所有 accumulator 給出的結果。拿 TopN 來說,emitValue
每次都會發送所有的最大的 n 個值。這在流式任務中可能會有一些效能問題。為了提升效能,使用者可以實現emitUpdateWithRetract
方法。這個方法在 retract 模式下會增量的輸出結果,比如有資料更新了,我們必須要撤回老的資料,然後再發送新的資料。如果定義了emitUpdateWithRetract
方法,那它會優先於emitValue
方法被使用,因為一般認為emitUpdateWithRetract
會更加高效,因為它的輸出是增量的。
TableAggregateFunction
的所有方法都必須是public
的、非static
的,而且名字必須跟上面提到的一樣。createAccumulator
、getResultType
和getAccumulatorType
這三個方法是在抽象父類TableAggregateFunction
中定義的,而其他的方法都是約定的方法。要實現一個表值聚合函式,你必須擴充套件org.apache.flink.table.functions.TableAggregateFunction
,並且實現一個(或者多個)accumulate
方法。accumulate
方法可以有多個過載的方法,也可以支援變長引數。
TableAggregateFunction
的所有方法的詳細文件如下。
/** * Base class for user-defined aggregates and table aggregates. * * @tparam T the type of the aggregation result. * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the * aggregated values which are needed to compute an aggregation result. */ abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction { /** * Creates and init the Accumulator for this (table)aggregate function. * * @return the accumulator with the initial value */ def createAccumulator(): ACC // MANDATORY /** * Returns the TypeInformation of the (table)aggregate function's result. * * @return The TypeInformation of the (table)aggregate function's result or null if the result * type should be automatically inferred. */ def getResultType: TypeInformation[T] = null // PRE-DEFINED /** * Returns the TypeInformation of the (table)aggregate function's accumulator. * * @return The TypeInformation of the (table)aggregate function's accumulator or null if the * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED } /** * Base class for table aggregation functions. * * @tparam T the type of the aggregation result * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the * aggregated values which are needed to compute an aggregation result. * TableAggregateFunction represents its state using accumulator, thereby the state of * the TableAggregateFunction must be put into the accumulator. */ abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, ACC] { /** * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction * requires at least one accumulate() method. * * @param accumulator the accumulator which contains the current aggregated results * @param [user defined inputs] the input value (usually obtained from a new arrived data). */ def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY /** * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for * datastream bounded over aggregate. * * @param accumulator the accumulator which contains the current aggregated results * @param [user defined inputs] the input value (usually obtained from a new arrived data). */ def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL /** * Merges a group of accumulator instances into one accumulator instance. This function must be * implemented for datastream session window grouping aggregate and dataset grouping aggregate. * * @param accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be * merged. */ def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL /** * Called every time when an aggregation result should be materialized. The returned value * could be either an early and incomplete result (periodically emitted as data arrive) or * the final result of the aggregation. * * @param accumulator the accumulator which contains the current * aggregated results * @param out the collector used to output data */ def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL /** * Called every time when an aggregation result should be materialized. The returned value * could be either an early and incomplete result (periodically emitted as data arrive) or * the final result of the aggregation. * * Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated. * This method outputs data incrementally in retract mode, i.e., once there is an update, we * have to retract old records before sending new updated ones. The emitUpdateWithRetract * method will be used in preference to the emitValue method if both methods are defined in the * table aggregate function, because the method is treated to be more efficient than emitValue * as it can outputvalues incrementally. * * @param accumulator the accumulator which contains the current * aggregated results * @param out the retractable collector used to output data. Use collect method * to output(add) records and use retract method to retract(delete) * records. */ def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL /** * Collects a record and forwards it. The collector can output retract messages with the retract * method. Note: only use it in `emitRetractValueIncrementally`. */ trait RetractableCollector[T] extends Collector[T] { /** * Retract a record. * * @param record The record to retract. */ def retract(record: T): Unit } }View Code
下面的例子展示瞭如何
- 定義一個
TableAggregateFunction
來計算給定列的最大的 2 個值, - 在
TableEnvironment
中註冊函式, - 在 Table API 查詢中使用函式(當前只在 Table API 中支援 TableAggregateFunction)。
為了計算最大的 2 個值,accumulator 需要儲存當前看到的最大的 2 個值。在我們的例子中,我們定義了類Top2Accum
來作為 accumulator。Flink 的 checkpoint 機制會自動儲存 accumulator,並且在失敗時進行恢復,來保證精確一次的語義。
我們的Top2
表值聚合函式(TableAggregateFunction
)的accumulate()
方法有兩個輸入,第一個是Top2Accum
accumulator,另一個是使用者定義的輸入:輸入的值v
。儘管merge()
方法在大多數聚合型別中不是必須的,我們也在樣例中提供了它的實現。請注意,我們在 Scala 樣例中也使用的是 Java 的基礎型別,並且定義了getResultType()
和getAccumulatorType()
方法,因為 Flink 的型別推導對於 Scala 的型別推導支援的不是很好。
import java.lang.{Integer => JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction /** * Accumulator for top2. */ class Top2Accum { var first: JInteger = _ var second: JInteger = _ } /** * The top2 user-defined table aggregate function. */ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] { override def createAccumulator(): Top2Accum = { val acc = new Top2Accum acc.first = Int.MinValue acc.second = Int.MinValue acc } def accumulate(acc: Top2Accum, v: Int) { if (v > acc.first) { acc.second = acc.first acc.first = v } else if (v > acc.second) { acc.second = v } } def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = { val iter = its.iterator() while (iter.hasNext) { val top2 = iter.next() accumulate(acc, top2.first) accumulate(acc, top2.second) } } def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = { // emit the value and rank if (acc.first != Int.MinValue) { out.collect(JTuple2.of(acc.first, 1)) } if (acc.second != Int.MinValue) { out.collect(JTuple2.of(acc.second, 2)) } } } // 初始化表 val tab = ... // 使用函式 tab .groupBy('key) .flatAggregate(top2('a) as ('v, 'rank)) .select('key, 'v, 'rank)
下面的例子展示瞭如何使用emitUpdateWithRetract
方法來只發送更新的資料。為了只發送更新的結果,accumulator 儲存了上一次的最大的2個值,也儲存了當前最大的2個值。注意:如果 TopN 中的 n 非常大,這種既儲存上次的結果,也儲存當前的結果的方式不太高效。一種解決這種問題的方式是把輸入資料直接儲存到accumulator
中,然後在呼叫emitUpdateWithRetract
方法時再進行計算。
import java.lang.{Integer => JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction /** * Accumulator for top2. */ class Top2Accum { var first: JInteger = _ var second: JInteger = _ var oldFirst: JInteger = _ var oldSecond: JInteger = _ } /** * The top2 user-defined table aggregate function. */ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] { override def createAccumulator(): Top2Accum = { val acc = new Top2Accum acc.first = Int.MinValue acc.second = Int.MinValue acc.oldFirst = Int.MinValue acc.oldSecond = Int.MinValue acc } def accumulate(acc: Top2Accum, v: Int) { if (v > acc.first) { acc.second = acc.first acc.first = v } else if (v > acc.second) { acc.second = v } } def emitUpdateWithRetract( acc: Top2Accum, out: RetractableCollector[JTuple2[JInteger, JInteger]]) : Unit = { if (acc.first != acc.oldFirst) { // if there is an update, retract old value then emit new value. if (acc.oldFirst != Int.MinValue) { out.retract(JTuple2.of(acc.oldFirst, 1)) } out.collect(JTuple2.of(acc.first, 1)) acc.oldFirst = acc.first } if (acc.second != acc.oldSecond) { // if there is an update, retract old value then emit new value. if (acc.oldSecond != Int.MinValue) { out.retract(JTuple2.of(acc.oldSecond, 2)) } out.collect(JTuple2.of(acc.second, 2)) acc.oldSecond = acc.second } } } // 初始化表 val tab = ... // 使用函式 tab .groupBy('key) .flatAggregate(top2('a) as ('v, 'rank)) .select('key, 'v, 'rank)