1. 程式人生 > 其它 >Flink實戰案例(四十一): Operators(二)MAP

Flink實戰案例(四十一): Operators(二)MAP

技術標籤:Flink入門大資料成神之路

宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

MAP

map運算元通過呼叫DataStream.map()來指定。map運算元的使用將會產生一條新的資料流。它會將每一個輸入的事件傳送到一個使用者自定義的mapper,這個mapper只返回一個輸出事件,這個輸出事件和輸入事件的型別可能不一樣。圖5-1展示了一個map運算元,這個map將每一個正方形轉化成了圓形。


MapFunction的型別與輸入事件和輸出事件的型別相關,可以通過實現MapFunction介面來定義。介面包含map()

函式,這個函式將一個輸入事件恰好轉換為一個輸出事件。

// T: the type of input elements
// O: the type of output elements
MapFunction[T, O]
    > map(T): O

例項一:

下面的程式碼實現了將SensorReading中的id欄位抽取出來的功能。

scala version

val readings: DataStream[SensorReading] = ...
val sensorIds: DataStream[String] = readings.map(new IdExtractor)

class IdExtractor extends MapFunction[SensorReading, String] {
    override def map(r: SensorReading) : String = r.id
}

當然我們更推薦匿名函式的寫法。

val sensorIds: DataStream[String] = filteredReadings.map(r => r.id)

java version

DataStream<SensorReading> readings = ...
DataStream<String> sensorIds = readings.map(new IdExtractor());

public static class IdExtractor implements MapFunction<SensorReading, String> {
    @Override
    public String map(SensorReading r) throws Exception {
        return r.id;
    }
}

當然我們更推薦匿名函式的寫法。

DataStream<String> sensorIds = filteredReadings.map(r -> r.id);

例項二:

我們可以重寫MapFunctionRichMapFunction來自定義map函式,RichMapFunction的定義為:RichMapFunction[IN, OUT],其內部有一個map虛擬函式,我們需要對這個虛擬函式重寫。


val dataStream: DataStream[Int] = senv.fromElements(1, 2, -3, 0, 5, -9, 8)

// 繼承RichMapFunction
// 第一個泛型是輸入型別,第二個引數是輸出泛型型別
class DoubleMapFunction extends RichMapFunction[Int, String] {
 override def map(input: Int): String =
  ("overide map Input : " + input.toString + ", Output : " + (input * 2).toString)
}

val richFunctionDataStream = dataStream.map {new DoubleMapFunction()}

上面的程式碼清單重寫了RichMapFunction中的map函式,將輸入結果乘以2,轉化為字串後輸出。我們也可以不用顯示定義DoubleMapFunction這個類,而是使用匿名類:


// 匿名類
val anonymousDataStream = dataStream.map {new RichMapFunction[Int, String] {
 override def map(input: Int): String = {
   ("overide mapInput : " + input.toString + ", Output : " + (input * 2).toString)
  }
}}

自定義map函式最簡便的操作是使用Lambda表示式。

// 使用=>構造Lambda表示式
val lambda = dataStream.map ( input => (input * 2).toDouble )
上面的程式碼清單中,我們對某整數資料流進行操作,輸入元素均為Int,輸出元素均為Double。

也可以使用下劃線來構造Lambda表示式:

// 使用 _ 構造Lambda表示式
val lambda2 = dataStream.map { _.toDouble * 2 }

注意,使用Scala進行Flink程式設計,自定義運算元時可以使用圓括號(),也可以使用花括號{}。

對上面的幾種方式比較可見,Lambda表示式更為簡潔,但是可讀性差,其他人不容易讀懂程式碼邏輯。重寫函式的方式程式碼更為臃腫,但定義更清晰。此外,RichFunction還提供了一系列其他方法,包括openclosegetRuntimeContextsetRuntimeContext等虛擬函式方法,重寫這些方法可以建立狀態資料、對資料進行廣播,獲取累加器和計數器等,