Flink實戰案例(四十二): Operators(三)FILTER
阿新 • • 發佈:2021-01-31
技術標籤:Flink入門
宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。
filter
filter
轉換運算元通過在每個輸入事件上對一個布林條件進行求值來過濾掉一些元素,然後將剩下的元素繼續傳送。一個true
的求值結果將會把輸入事件保留下來併發送到輸出,而如果求值結果為false
,則輸入事件會被拋棄掉。我們通過呼叫DataStream.filter()
來指定流的filter
運算元,filter
操作將產生一條新的流,其型別和輸入流中的事件型別是一樣的。圖5-2展示了只產生白色方框的filter
操作。
布林條件可以使用函式、FilterFunction介面或者匿名函式來實現。FilterFunction中的泛型是輸入事件的型別。定義的filter()
// T: the type of elements FilterFunction[T] > filter(T): Boolean
例項一:
下面的例子展示瞭如何使用filter來從感測器資料中過濾掉溫度值小於25華氏溫度的讀數。
scala version
val filteredReadings = readings.filter(r => r.temperature >= 25)
java version
DataStream<SensorReading> filteredReadings = readings.filter(r -> r.temperature >= 25);
例項二:
我們可以使用Lambda表示式過濾掉小於等於0的元素:
val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
// 使用 => 構造Lambda表示式
val lambda = dataStream.filter ( input => input > 0 )
// 使用 _ 構造Lambda表示式
val lambda2 = dataStream.map { _ > 0 }
也可以繼承FilterFunction
或RichFilterFunction
,然後重寫filter
方法,我們還可以將引數傳遞給繼承後的類。比如,MyFilterFunction
limit
,並在filter
方法中使用這個引數。
// 繼承RichFilterFunction
// limit引數可以從外部傳入
class MyFilterFunction(limit: Int) extends RichFilterFunction[Int] {
override def filter(input: Int): Boolean = {
if (input > limit) {
true
} else {
false
}
}
}
val richFunctionDataStream = dataStream.filter(new MyFilterFunction(2))