Flink實戰案例(四十三): Operators(四)FLATMAP
技術標籤:Flink入門
宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。
FLATMAP
flatMap
運算元和map
運算元很類似,不同之處在於針對每一個輸入事件flatMap
可以生成0個、1個或者多個輸出元素。事實上,flatMap
轉換運算元是filter
和map
的泛化。所以flatMap
可以實現map
和filter
運算元的功能。圖5-3展示了flatMap
如何根據輸入事件的顏色來做不同的處理。如果輸入事件是白色方框,則直接輸出。輸入元素是黑框,則複製輸入。灰色方框會被過濾掉。
flatMap運算元將會應用在每一個輸入事件上面。對應的FlatMapFunction
flatMap()
方法,這個方法返回0個、1個或者多個事件到一個Collector
集合中,作為輸出結果。
// T: the type of input elements // O: the type of output elements FlatMapFunction[T, O] > flatMap(T, Collector[O]): Unit
例項一:
下面的例子展示了在資料分析教程中經常用到的例子,我們用flatMap
來實現。使用_
來切割感測器ID,比如sensor_1
。
scala version
class IdSplitter extends FlatMapFunction[String, String] { override def flatMap(id: String, out: Collector[String]) : Unit = { val arr = id.split("_") arr.foreach(out.collect) } }
匿名函式寫法
val splitIds = sensorIds .flatMap(r => r.split("_"))
java version
public static class IdSplitter implements FlatMapFunction<String, String> {
@Override
public void flatMap(String id, Collector<String> out) {
String[] splits = id.split("_");
for (String split : splits) {
}
}
}
匿名函式寫法:
DataStream<String> splitIds = sensorIds .flatMap((FlatMapFunction<String, String>) (id, out) -> { for (String s: id.split("_")) { out.collect(s);}}) // provide result type because Java cannot infer return type of lambda function // 提供結果的型別,因為Java無法推斷匿名函式的返回值型別 .returns(Types.STRING);
例項二:
函式類
對於map
、flatMap
、reduce
等方法,我們可以實現MapFunction
、FlatMapFunction
、ReduceFunction
等interface介面。這些函式類簽名中都有泛型引數,用來定義該函式的輸入或輸出的資料型別。我們要繼承這些類,並重寫裡面的自定義函式。以flatMap
對應的FlatMapFunction
為例,它在原始碼中的定義為:
public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; }
這是一個介面類,它繼承了Flink的Function
函式式介面。函式式介面只有一個抽象函式方法(Single Abstract Method),其目的是為了方便Java 8 Lambda表示式的使用。此外,它還繼承了Serializable
,以便進行序列化,這是因為這些函式在執行過程中要傳送到各個TaskManager上,傳送前後要進行序列化和反序列化。需要注意的是,使用這些函式時,一定要保證函式內的所有內容都可以被序列化。如果有一些不能被序列化的內容,或者使用接下來介紹的Rich函式類,或者重寫Java的序列化和反序列化方法。
進一步觀察FlatMapFunction
發現,這個這個函式有兩個泛型T和O,T是輸入,O是輸出,在使用時,要設定好對應的輸入和輸出資料型別。自定義函式最終歸結為重寫函式flatMap
,函式的兩個引數也與輸入輸出的泛型型別對應,即引數value的是flatMap
的輸入,資料型別是T,引數out是flatMap
的輸出,我們需要將型別為O的資料寫入out。
我們繼承FlatMapFunction
,並實現flatMap
,只對長度大於limit的字串切詞:
// 使用FlatMapFunction實現過濾邏輯,只對字串長度大於 limit 的內容進行切詞
class WordSplitFlatMap(limit: Int) extends FlatMapFunction[String, String] {
override def flatMap(value: String, out: Collector[String]): Unit = {
// split返回一個Array
// 將Array中的每個元素使用Collector.collect收集起來,起到將列表展平的效果
if (value.size > limit) {
value.split(" ").foreach(out.collect)
}
}
}
val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink")
val function = dataStream.flatMap(new WordSplitFlatMap(10))
其中,Collector
起著收集輸出的作用。
Lambda表示式
當不需要處理非常複雜的業務邏輯時,使用Lambda表示式可能是更好的選擇,Lambda表示式能讓程式碼更簡潔緊湊。Java 8和Scala都對Lambda表示式支援非常好。
對於flatMap
,Flink的Scala原始碼有三種定義,我們先看一下第一種的定義:
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] = {...}
flatMap
輸入是泛型T,輸出是泛型R,接收一個名為fun的Lambda表示式,fun形如(T, Collector[R] => {...})
。
我們繼續以切詞為例,Lambda表示式為:
val lambda = dataStream.flatMap{
(value: String, out: Collector[String]) => {
if (value.size > 10) {
value.split(" ").foreach(out.collect)
}
}
}
然後我們看一下原始碼中的第二種定義:
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {...}
與之前的不同,這裡的Lambda表示式輸入是泛型T,輸出是一個TraversableOnce[R]
,TraversableOnce
表示這是一個R組成的列表。與之前使用Collector
收集輸出不同,這裡直接輸出一個列表,Flink幫我們將列表做了展平。使用TraversableOnce
也導致我們無論如何都要返回一個列表,即使是一個空列表,否則無法匹配函式的定義。總結下來,這種場景的Lambda表示式輸入是一個T,無論如何輸出都是一個R的列表,即使是一個空列表。
// 只對字串數量大於15的句子進行處理
val longSentenceWords = dataStream.flatMap {
input => {
if (input.size > 15) {
// 輸出是 TraversableOnce 因此返回必須是一個列表
// 這裡將Array[String]轉成了Seq[String]
input.split(" ").toSeq
} else {
// 為空時必須返回空列表,否則返回值無法與TraversableOnce匹配!
Seq.empty
}
}
}
在使用Lambda表示式時,我們應該逐漸學會使用Intellij Idea的型別檢查和匹配功能。比如在本例中,如果返回值不是一個TraversableOnce
,那麼Intellij Idea會將該行標紅,告知我們輸入或輸出的型別不匹配。
此外,還有第三種只針對Scala的Lambda表示式使用方法。Flink為了保持Java和Scala API的一致性,一些Scala獨有的特性沒有被放入標準的API,而是整合到了一個擴充套件包中。這種API支援型別匹配的偏函式(Partial Function),結合case關鍵字結合,能夠在語義上更好地描述資料型別:
val data: DataStream[(String, Long, Double)] = [...] data.flatMapWith { case (symbol, timestamp, price) => // ... }
使用這種API時,需要新增引用:
import org.apache.flink.streaming.api.scala.extensions._
這種方式給輸入定義了變數名和型別,方便閱讀者閱讀程式碼,同時也保留了函數語言程式設計的簡潔。Spark的大多數運算元預設都支援此功能,對於Spark使用者來說,遷移到Flink時需要注意這個區別。此外mapWith
、filterWith
、keyingBy
、reduceWith
也都支援這種功能。
使用flatMapWith
,之前的切詞可以實現為:
val flatMapWith = dataStream.flatMapWith {
case (sentence: String) => {
if (sentence.size > 15) {
sentence.split(" ").toSeq
} else {
Seq.empty
}
}
}
Rich函式類
在上面兩種運算元自定義的基礎上,Flink還提供了Rich函式類。從名稱上來看,這種函式類在普通的函式類上增加了Rich字首,比如RichMapFunction
、RichFlatMapFunction
或RichReduceFunction
等等。比起普通的函式類,Rich函式類增加了:
open()
方法:Flink在運算元呼叫前會執行這個方法,可以用來進行一些初始化工作。close()
方法:Flink在運算元最後一次呼叫結束後執行這個方法,可以用來釋放一些資源。getRuntimeContext
方法:獲取執行時上下文。每個並行的運算元子任務都有一個執行時上下文,上下文記錄了這個運算元執行過程中的一些資訊,包括運算元當前的並行度、運算元子任務序號、廣播資料、累加器、監控資料。最重要的是,我們可以從上下文裡獲取狀態資料。
我們可以看一下原始碼中的函式簽名:
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT>
它既實現了FlatMapFunction
介面類,又繼承了AbstractRichFunction
。其中AbstractRichFunction
是一個抽象類,有一個成員變數RuntimeContext
,有open
、close
和getRuntimeContext
等方法。
我們嘗試使用FlatMapFunction
並使用一個累加器。在單機環境下,我們可以用一個for迴圈做累加統計,但是在分散式計算環境下,計算是分佈在多臺節點上的,每個節點處理一部分資料,因此單純迴圈無法滿足計算,累加器是大資料框架幫我們實現的一種機制,允許我們在多節點上進行累加統計。
// 使用RichFlatMapFunction實現
// 添加了累加器 Accumulator
class WordSplitRichFlatMap(limit: Int) extends RichFlatMapFunction[String, String] {
// 建立一個累加器
val numOfLines: IntCounter = new IntCounter(0)
override def open(parameters: Configuration): Unit = {
// 在RuntimeContext中註冊累加器
getRuntimeContext.addAccumulator("num-of-lines", this.numOfLines)
}
override def flatMap(value: String, out: Collector[String]): Unit = {
// 執行過程中呼叫累加器
this.numOfLines.add(1)
if(value.size > limit) {
value.split(" ").foreach(out.collect)
}
}
}
val richFunction = dataStream.flatMap(new WordSplitRichFlatMap(10))
val jobExecuteResult = senv.execute("basic flatMap transformation")
// 執行結束後 獲取累加器的結果
val lines: Int = jobExecuteResult.getAccumulatorResult("num-of-lines")
println("num of lines: " + lines)