Flink計算支援的資料型別
阿新 • • 發佈:2020-12-23
技術標籤:大資料之Flink
Flink處理資料介紹
Flink流應用程式處理是以資料物件表示的事件流。在Flink內部,處理資料物件,通過被序列化和反序列化進行網路傳送,從狀態後端、檢查點和儲存點讀取它們。
為了有效地做到這一點,Flink需要明確知道應用程式所處理的資料型別。Flink使用型別資訊的概念來表示資料型別,併為每個資料型別生成特定的序列化器、反序列化器和比較器。
Flink還具有一個型別提取系統,該系統分析函式的輸入和返回型別,以自動獲取型別資訊,從而獲得序列化器和反序列化器。
Flink支援Java和Scala中所有常見資料型別,在某些情況下,例如lambda函式或泛型型別,需要顯式地宣告型別資訊,應用程式才能正常工作。
使用最廣泛的型別有如下幾種:
基礎資料型別
Flink支援Java、Scala所有基本資料型別,如Integer、String、Double、Long 等
java編寫示例:
DataStream<Long> stream1 = env.fromElements(1L, 2L, 3L, 4L); SingleOutputStreamOperator<Object> streamMap = stream1.map(new MapFunction<Long, Object>() { @Override public Object map(Long n) throws Exception { return n+1; } });
Java 物件(POJOs)
java編寫示例:
DataStream<Persion> stream2 = env.fromElements(new Persion("王五",25),new Persion("趙六",26)); DataStream<Persion> stream2Filter = stream2.filter(new FilterFunction<Persion>() { @Override public boolean filter(Persion persion) throws Exception { if(persion.age>25){ return true; } return false; } });
Java和Scala元組(Tuples)
java編寫示例:
DataStream<Tuple2<String, Integer>> stream3 =
stream2.flatMap(new FlatMapFunction<Persion, Tuple2<String, Integer>>() {
@Override
public void flatMap(Persion persion, Collector<Tuple2<String, Integer>> collector) throws Exception {
//形成一個元組
Tuple2<String, Integer> tp = Tuple2.of(persion.name, persion.age+1);
//將組成的Tuple放入到 Collector 集合,並輸出
collector.collect(tp);
}
});
Scala 樣例類(case classes)
示例:
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("王五", 25),
Person("趙六", 26) )
persons.filter(p => p.age > 25)
其它資料型別 Arrays, Lists, Maps, Enums等
Flink 也可支援Java 和Scala 語言中的其它資料型別,如Java語言的資料型別:ArrayList,HashMap,Enum等。