1. 程式人生 > 其它 >Flink實戰案例(四十二): Operators(三)FILTER

Flink實戰案例(四十二): Operators(三)FILTER

技術標籤:Flink入門

宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

filter

filter轉換運算元通過在每個輸入事件上對一個布林條件進行求值來過濾掉一些元素,然後將剩下的元素繼續傳送。一個true的求值結果將會把輸入事件保留下來併發送到輸出,而如果求值結果為false,則輸入事件會被拋棄掉。我們通過呼叫DataStream.filter()來指定流的filter運算元,filter操作將產生一條新的流,其型別和輸入流中的事件型別是一樣的。圖5-2展示了只產生白色方框的filter操作。

布林條件可以使用函式、FilterFunction介面或者匿名函式來實現。FilterFunction中的泛型是輸入事件的型別。定義的filter()

方法會作用在每一個輸入元素上面,並返回一個布林值。

// T: the type of elements
FilterFunction[T]
    > filter(T): Boolean

例項一:

下面的例子展示瞭如何使用filter來從感測器資料中過濾掉溫度值小於25華氏溫度的讀數。

scala version

val filteredReadings = readings.filter(r => r.temperature >= 25)

java version

DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);

例項二:

我們可以使用Lambda表示式過濾掉小於等於0的元素:

val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
// 使用 => 構造Lambda表示式
val lambda = dataStream.filter ( input => input > 0 )
// 使用 _ 構造Lambda表示式
val lambda2 = dataStream.map { _ > 0 }

也可以繼承FilterFunctionRichFilterFunction,然後重寫filter方法,我們還可以將引數傳遞給繼承後的類。比如,MyFilterFunction

增加一個建構函式引數limit,並在filter方法中使用這個引數。

// 繼承RichFilterFunction
// limit引數可以從外部傳入
class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {
  override def filter(input: Int): Boolean = {
    if (input > limit) {
      true
    } else {
      false
    }
  }
  
}
val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))