1. 程式人生 > 其它 >Flink實戰案例(四十三): Operators(四)FLATMAP

Flink實戰案例(四十三): Operators(四)FLATMAP

技術標籤:Flink入門

宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

FLATMAP

flatMap運算元和map運算元很類似,不同之處在於針對每一個輸入事件flatMap可以生成0個、1個或者多個輸出元素。事實上,flatMap轉換運算元是filtermap的泛化。所以flatMap可以實現mapfilter運算元的功能。圖5-3展示了flatMap如何根據輸入事件的顏色來做不同的處理。如果輸入事件是白色方框,則直接輸出。輸入元素是黑框,則複製輸入。灰色方框會被過濾掉。

flatMap運算元將會應用在每一個輸入事件上面。對應的FlatMapFunction

定義了flatMap()方法,這個方法返回0個、1個或者多個事件到一個Collector集合中,作為輸出結果。

// T: the type of input elements
// O: the type of output elements
FlatMapFunction[T, O]
    > flatMap(T, Collector[O]): Unit

例項一:

下面的例子展示了在資料分析教程中經常用到的例子,我們用flatMap來實現。使用_來切割感測器ID,比如sensor_1

scala version

class IdSplitter extends FlatMapFunction[String, String] {
    override def flatMap(id: String, out: Collector[String]) : Unit = {
        val arr = id.split("_")
        arr.foreach(out.collect)
    }
}

匿名函式寫法

val splitIds = sensorIds
  .flatMap(r => r.split("_"))

java version

public static class IdSplitter implements FlatMapFunction<String, String> {
@Override
public void flatMap(String id, Collector<String> out) {

String[] splits = id.split("_");

for (String split : splits) {

out.collect(split);
}
}
}

匿名函式寫法:

DataStream<String> splitIds = sensorIds
        .flatMap((FlatMapFunction<String, String>)
                (id, out) -> { for (String s: id.split("_")) { out.collect(s);}})
        // provide result type because Java cannot infer return type of lambda function
        // 提供結果的型別,因為Java無法推斷匿名函式的返回值型別
        .returns(Types.STRING);

例項二:

函式類

  對於mapflatMapreduce等方法,我們可以實現MapFunctionFlatMapFunctionReduceFunction等interface介面。這些函式類簽名中都有泛型引數,用來定義該函式的輸入或輸出的資料型別。我們要繼承這些類,並重寫裡面的自定義函式。以flatMap對應的FlatMapFunction為例,它在原始碼中的定義為:

public interface FlatMapFunction<T, O> extends Function, Serializable {

    void flatMap(T value, Collector<O> out) throws Exception;
  
}

這是一個介面類,它繼承了Flink的Function函式式介面。函式式介面只有一個抽象函式方法(Single Abstract Method),其目的是為了方便Java 8 Lambda表示式的使用。此外,它還繼承了Serializable,以便進行序列化,這是因為這些函式在執行過程中要傳送到各個TaskManager上,傳送前後要進行序列化和反序列化。需要注意的是,使用這些函式時,一定要保證函式內的所有內容都可以被序列化。如果有一些不能被序列化的內容,或者使用接下來介紹的Rich函式類,或者重寫Java的序列化和反序列化方法。

進一步觀察FlatMapFunction發現,這個這個函式有兩個泛型T和O,T是輸入,O是輸出,在使用時,要設定好對應的輸入和輸出資料型別。自定義函式最終歸結為重寫函式flatMap,函式的兩個引數也與輸入輸出的泛型型別對應,即引數value的是flatMap的輸入,資料型別是T,引數out是flatMap的輸出,我們需要將型別為O的資料寫入out。

我們繼承FlatMapFunction,並實現flatMap,只對長度大於limit的字串切詞:

// 使用FlatMapFunction實現過濾邏輯,只對字串長度大於 limit 的內容進行切詞
class WordSplitFlatMap(limit: Int) extends FlatMapFunction[String, String] {
  override def flatMap(value: String, out: Collector[String]): Unit = {
    // split返回一個Array
    // 將Array中的每個元素使用Collector.collect收集起來,起到將列表展平的效果
    if (value.size > limit) {
      value.split(" ").foreach(out.collect)
    }
  }
}

val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink")

val function = dataStream.flatMap(new WordSplitFlatMap(10))

其中,Collector起著收集輸出的作用。

Lambda表示式

當不需要處理非常複雜的業務邏輯時,使用Lambda表示式可能是更好的選擇,Lambda表示式能讓程式碼更簡潔緊湊。Java 8和Scala都對Lambda表示式支援非常好。

對於flatMap,Flink的Scala原始碼有三種定義,我們先看一下第一種的定義:

def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] = {...}

flatMap輸入是泛型T,輸出是泛型R,接收一個名為fun的Lambda表示式,fun形如(T, Collector[R] => {...})

我們繼續以切詞為例,Lambda表示式為:

val lambda = dataStream.flatMap{
  (value: String, out: Collector[String]) => {
    if (value.size > 10) {
      value.split(" ").foreach(out.collect)
    }
  }
}

然後我們看一下原始碼中的第二種定義:

def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {...}

與之前的不同,這裡的Lambda表示式輸入是泛型T,輸出是一個TraversableOnce[R],TraversableOnce表示這是一個R組成的列表。與之前使用Collector收集輸出不同,這裡直接輸出一個列表,Flink幫我們將列表做了展平。使用TraversableOnce也導致我們無論如何都要返回一個列表,即使是一個空列表,否則無法匹配函式的定義。總結下來,這種場景的Lambda表示式輸入是一個T,無論如何輸出都是一個R的列表,即使是一個空列表。

// 只對字串數量大於15的句子進行處理
val longSentenceWords = dataStream.flatMap {
  input => {
    if (input.size > 15) {
      // 輸出是 TraversableOnce 因此返回必須是一個列表
      // 這裡將Array[String]轉成了Seq[String]
      input.split(" ").toSeq
    } else {
      // 為空時必須返回空列表,否則返回值無法與TraversableOnce匹配!
      Seq.empty
    }
  }
}

在使用Lambda表示式時,我們應該逐漸學會使用Intellij Idea的型別檢查和匹配功能。比如在本例中,如果返回值不是一個TraversableOnce,那麼Intellij Idea會將該行標紅,告知我們輸入或輸出的型別不匹配。

此外,還有第三種只針對Scala的Lambda表示式使用方法。Flink為了保持Java和Scala API的一致性,一些Scala獨有的特性沒有被放入標準的API,而是整合到了一個擴充套件包中。這種API支援型別匹配的偏函式(Partial Function),結合case關鍵字結合,能夠在語義上更好地描述資料型別:

val data: DataStream[(String, Long, Double)] = [...]
data.flatMapWith {
  case (symbol, timestamp, price) => // ...
}

使用這種API時,需要新增引用:

import org.apache.flink.streaming.api.scala.extensions._

這種方式給輸入定義了變數名和型別,方便閱讀者閱讀程式碼,同時也保留了函數語言程式設計的簡潔。Spark的大多數運算元預設都支援此功能,對於Spark使用者來說,遷移到Flink時需要注意這個區別。此外mapWithfilterWithkeyingByreduceWith也都支援這種功能。

使用flatMapWith,之前的切詞可以實現為:

val flatMapWith = dataStream.flatMapWith {
  case (sentence: String) => {
    if (sentence.size > 15) {
      sentence.split(" ").toSeq
    } else {
      Seq.empty
    }
  }
}

Rich函式類

在上面兩種運算元自定義的基礎上,Flink還提供了Rich函式類。從名稱上來看,這種函式類在普通的函式類上增加了Rich字首,比如RichMapFunctionRichFlatMapFunctionRichReduceFunction等等。比起普通的函式類,Rich函式類增加了:

  • open()方法:Flink在運算元呼叫前會執行這個方法,可以用來進行一些初始化工作。
  • close()方法:Flink在運算元最後一次呼叫結束後執行這個方法,可以用來釋放一些資源。
  • getRuntimeContext方法:獲取執行時上下文。每個並行的運算元子任務都有一個執行時上下文,上下文記錄了這個運算元執行過程中的一些資訊,包括運算元當前的並行度、運算元子任務序號、廣播資料、累加器、監控資料。最重要的是,我們可以從上下文裡獲取狀態資料。

我們可以看一下原始碼中的函式簽名:

public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT>

它既實現了FlatMapFunction介面類,又繼承了AbstractRichFunction。其中AbstractRichFunction是一個抽象類,有一個成員變數RuntimeContext,有openclosegetRuntimeContext等方法。

我們嘗試使用FlatMapFunction並使用一個累加器。在單機環境下,我們可以用一個for迴圈做累加統計,但是在分散式計算環境下,計算是分佈在多臺節點上的,每個節點處理一部分資料,因此單純迴圈無法滿足計算,累加器是大資料框架幫我們實現的一種機制,允許我們在多節點上進行累加統計。

// 使用RichFlatMapFunction實現
// 添加了累加器 Accumulator
class WordSplitRichFlatMap(limit: Int) extends RichFlatMapFunction[String, String] {
  // 建立一個累加器
  val numOfLines: IntCounter = new IntCounter(0)

  override def open(parameters: Configuration): Unit = {
    // 在RuntimeContext中註冊累加器
    getRuntimeContext.addAccumulator("num-of-lines", this.numOfLines)
  }

  override def flatMap(value: String, out: Collector[String]): Unit = {
    // 執行過程中呼叫累加器
    this.numOfLines.add(1)
    if(value.size > limit) {
      value.split(" ").foreach(out.collect)
    }
  }
}

val richFunction = dataStream.flatMap(new WordSplitRichFlatMap(10))

val jobExecuteResult = senv.execute("basic flatMap transformation")

// 執行結束後 獲取累加器的結果
val lines: Int = jobExecuteResult.getAccumulatorResult("num-of-lines")
println("num of lines: " + lines)