Flink實戰案例(四十一): Operators(二)MAP
宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。
MAP
map
運算元通過呼叫DataStream.map()
來指定。map
運算元的使用將會產生一條新的資料流。它會將每一個輸入的事件傳送到一個使用者自定義的mapper,這個mapper只返回一個輸出事件,這個輸出事件和輸入事件的型別可能不一樣。圖5-1展示了一個map運算元,這個map將每一個正方形轉化成了圓形。
MapFunction
的型別與輸入事件和輸出事件的型別相關,可以通過實現MapFunction
介面來定義。介面包含map()
// T: the type of input elements // O: the type of output elements MapFunction[T, O] > map(T): O
例項一:
下面的程式碼實現了將SensorReading中的id欄位抽取出來的功能。
scala version
val readings: DataStream[SensorReading] = ... val sensorIds: DataStream[String] = readings.map(new IdExtractor) class IdExtractor extends MapFunction[SensorReading, String] { override def map(r: SensorReading) : String = r.id }
當然我們更推薦匿名函式的寫法。
val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)
java version
DataStream<SensorReading> readings = ... DataStream<String> sensorIds = readings.map(new IdExtractor()); public static class IdExtractor implements MapFunction<SensorReading, String> { @Override public String map(SensorReading r) throws Exception { return r.id; } }
當然我們更推薦匿名函式的寫法。
DataStream<String> sensorIds = filteredReadings.map(r -> r.id);
例項二:
我們可以重寫MapFunction
或RichMapFunction
來自定義map函式,RichMapFunction的定義為:RichMapFunction[IN, OUT]
,其內部有一個map
虛擬函式,我們需要對這個虛擬函式重寫。
val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)
// 繼承RichMapFunction
// 第一個泛型是輸入型別,第二個引數是輸出泛型型別
class DoubleMapFunction extends RichMapFunction[Int, String] {
override def map(input: Int): String =
("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}
val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}
上面的程式碼清單重寫了RichMapFunction
中的map
函式,將輸入結果乘以2,轉化為字串後輸出。我們也可以不用顯示定義DoubleMapFunction
這個類,而是使用匿名類:
// 匿名類
val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
override def map(input: Int): String = {
("overide mapInput : " + input.toString + ", Output : " + (input * 2).toString)
}
}}
自定義map函式最簡便的操作是使用Lambda表示式。
// 使用=>構造Lambda表示式
val lambda = dataStream.map ( input => (input * 2).toDouble )
上面的程式碼清單中,我們對某整數資料流進行操作,輸入元素均為Int,輸出元素均為Double。
也可以使用下劃線來構造Lambda表示式:
// 使用 _ 構造Lambda表示式
val lambda2 = dataStream.map { _.toDouble * 2 }
注意,使用Scala進行Flink程式設計,自定義運算元時可以使用圓括號(),也可以使用花括號{}。
對上面的幾種方式比較可見,Lambda表示式更為簡潔,但是可讀性差,其他人不容易讀懂程式碼邏輯。重寫函式的方式程式碼更為臃腫,但定義更清晰。此外,RichFunction還提供了一系列其他方法,包括open
、close
、getRuntimeContext
和setRuntimeContext
等虛擬函式方法,重寫這些方法可以建立狀態資料、對資料進行廣播,獲取累加器和計數器等,