1. 程式人生 > 其它 >Flink計算支援的資料型別

Flink計算支援的資料型別

技術標籤:大資料之Flink

Flink處理資料介紹

Flink流應用程式處理是以資料物件表示的事件流。在Flink內部,處理資料物件,通過被序列化和反序列化進行網路傳送,從狀態後端、檢查點和儲存點讀取它們。

為了有效地做到這一點,Flink需要明確知道應用程式所處理的資料型別。Flink使用型別資訊的概念來表示資料型別,併為每個資料型別生成特定的序列化器、反序列化器和比較器。

Flink還具有一個型別提取系統,該系統分析函式的輸入和返回型別,以自動獲取型別資訊,從而獲得序列化器和反序列化器。

Flink支援Java和Scala中所有常見資料型別,在某些情況下,例如lambda函式或泛型型別,需要顯式地宣告型別資訊,應用程式才能正常工作。

使用最廣泛的型別有如下幾種:

基礎資料型別

Flink支援Java、Scala所有基本資料型別,如Integer、String、Double、Long 等

java編寫示例:

  DataStream<Long> stream1 = env.fromElements(1L, 2L, 3L, 4L);
        SingleOutputStreamOperator<Object> streamMap = stream1.map(new MapFunction<Long, Object>() {
            @Override
            public Object map(Long n) throws Exception {
                return n+1;
            }
        });

Java 物件(POJOs)

java編寫示例:

  DataStream<Persion> stream2 = env.fromElements(new Persion("王五",25),new Persion("趙六",26));
        DataStream<Persion> stream2Filter = stream2.filter(new FilterFunction<Persion>() {
            @Override
            public boolean filter(Persion persion) throws Exception {
                if(persion.age>25){
                    return true;
                }
                return false;
            }
        });

Java和Scala元組(Tuples)

java編寫示例:

  DataStream<Tuple2<String, Integer>> stream3 =  
               stream2.flatMap(new FlatMapFunction<Persion, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(Persion persion, Collector<Tuple2<String, Integer>> collector) throws Exception {
                //形成一個元組
                Tuple2<String, Integer> tp = Tuple2.of(persion.name, persion.age+1);
                //將組成的Tuple放入到 Collector 集合,並輸出
                collector.collect(tp);
            }
        });

Scala 樣例類(case classes)

示例:

case class Person(name: String, age: Int) 
val persons: DataStream[Person] = env.fromElements(
Person("王五", 25), 
Person("趙六", 26) )
persons.filter(p => p.age > 25)

其它資料型別 Arrays, Lists, Maps, Enums等

Flink 也可支援Java 和Scala 語言中的其它資料型別,如Java語言的資料型別:ArrayList,HashMap,Enum等。