1. 程式人生 > 程式設計 >Flink 系列(四)—— Flink Data Transformation

Flink 系列(四)—— Flink Data Transformation

一、Transformations 分類

Flink 的 Transformations 操作主要用於將一個和多個 DataStream 按需轉換成新的 DataStream。它主要分為以下三類:

  • DataStream Transformations:進行資料流相關轉換操作;
  • Physical partitioning:物理分割槽。Flink 提供的底層 API ,允許使用者定義資料的分割槽規則;
  • Task chaining and resource groups:任務鏈和資源組。允許使用者進行任務鏈和資源組的細粒度的控制。

以下分別對其主要 API 進行介紹:

二、DataStream Transformations

2.1 Map [DataStream → DataStream]

對一個 DataStream 中的每個元素都執行特定的轉換操作:

DataStream<Integer> integerDataStream = env.fromElements(1,2,3,4,5);
integerDataStream.map((MapFunction<Integer,Object>) value -> value * 2).print();
// 輸出 2,4,6,8,10
複製程式碼

2.2 FlatMap [DataStream → DataStream]

FlatMap 與 Map 類似,但是 FlatMap 中的一個輸入元素可以被對映成一個或者多個輸出元素,示例如下:

String string01 = "one one one two two";
String string02 = "third third third four";
DataStream<String> stringDataStream = env.fromElements(string01,string02);
stringDataStream.flatMap(new FlatMapFunction<String,String>() {
    @Override
    public void flatMap(String value,Collector<String> out)
throws Exception
{ for (String s : value.split(" ")) { out.collect(s); } } }).print(); // 輸出每一個獨立的單詞,為節省排版,這裡去掉換行,後文亦同 one one one two two third third third four 複製程式碼

2.3 Filter [DataStream → DataStream]

用於過濾符合條件的資料:

env.fromElements(1,5).filter(x -> x > 3).print();
複製程式碼

2.4 KeyBy 和 Reduce

  • KeyBy [DataStream → KeyedStream] :用於將相同 Key 值的資料分到相同的分割槽中;
  • Reduce [KeyedStream → DataStream] :用於對資料執行歸約計算。

如下例子將資料按照 key 值分割槽後,滾動進行求和計算:

DataStream<Tuple2<String,Integer>> tuple2DataStream = env.fromElements(new Tuple2<>("a",1),new Tuple2<>("a",2),new Tuple2<>("b",3),5));
KeyedStream<Tuple2<String,Integer>,Tuple> keyedStream = tuple2DataStream.keyBy(0);
keyedStream.reduce((ReduceFunction<Tuple2<String,Integer>>) (value1,value2) ->
                   new Tuple2<>(value1.f0,value1.f1 + value2.f1)).print();

// 持續進行求和計算,輸出:
(a,1)
(a,3)
(b,8)
複製程式碼

KeyBy 操作存在以下兩個限制:

  • KeyBy 操作用於使用者自定義的 POJOs 型別時,該自定義型別必須重寫 hashCode 方法;
  • KeyBy 操作不能用於陣列型別。

2.5 Aggregations [KeyedStream → DataStream]

Aggregations 是官方提供的聚合運算元,封裝了常用的聚合操作,如上利用 Reduce 進行求和的操作也可以利用 Aggregations 中的 sum 運算元重寫為下面的形式:

tuple2DataStream.keyBy(0).sum(1).print();
複製程式碼

除了 sum 外,Flink 還提供了 min,max,minBy,maxBy 等常用聚合運算元:

// 滾動計算指定key的最小值,可以通過index或者fieldName來指定key
keyedStream.min(0);
keyedStream.min("key");
// 滾動計算指定key的最大值
keyedStream.max(0);
keyedStream.max("key");
// 滾動計算指定key的最小值,並返回其對應的元素
keyedStream.minBy(0);
keyedStream.minBy("key");
// 滾動計算指定key的最大值,並返回其對應的元素
keyedStream.maxBy(0);
keyedStream.maxBy("key");

複製程式碼

2.6 Union [DataStream* → DataStream]

用於連線兩個或者多個元素型別相同的 DataStream 。當然一個 DataStream 也可以與其本生進行連線,此時該 DataStream 中的每個元素都會被獲取兩次:

DataStreamSource<Tuple2<String,Integer>> streamSource01 = env.fromElements(new Tuple2<>("a",1),new Tuple2<>("a",2));
DataStreamSource<Tuple2<String,Integer>> streamSource02 = env.fromElements(new Tuple2<>("b",new Tuple2<>("b",2));
streamSource01.union(streamSource02);
streamSource01.union(streamSource01,streamSource02);
複製程式碼

2.7 Connect [DataStream,DataStream → ConnectedStreams]

Connect 操作用於連線兩個或者多個型別不同的 DataStream ,其返回的型別是 ConnectedStreams ,此時被連線的多個 DataStreams 可以共享彼此之間的資料狀態。但是需要注意的是由於不同 DataStream 之間的資料型別是不同的,如果想要進行後續的計算操作,還需要通過 CoMap 或 CoFlatMap 將 ConnectedStreams 轉換回 DataStream:

DataStreamSource<Tuple2<String,Integer>> streamSource01 = env.fromElements(new Tuple2<>("a",5));
DataStreamSource<Integer> streamSource02 = env.fromElements(2,9);
// 使用connect進行連線
ConnectedStreams<Tuple2<String,Integer> connect = streamSource01.connect(streamSource02);
connect.map(new CoMapFunction<Tuple2<String,Integer,Integer>() {
    @Override
    public Integer map1(Tuple2<String,Integer> value) throws Exception {
        return value.f1;
    }

    @Override
    public Integer map2(Integer value) throws Exception {
        return value;
    }
}).map(x -> x * 100).print();

// 輸出:
300 500 200 900 300
複製程式碼

2.8 Split 和 Select

  • Split [DataStream → SplitStream]:用於將一個 DataStream 按照指定規則進行拆分為多個 DataStream,需要注意的是這裡進行的是邏輯拆分,即 Split 只是將資料貼上不同的型別標籤,但最終返回的仍然只是一個 SplitStream;
  • Select [SplitStream → DataStream]:想要從邏輯拆分的 SplitStream 中獲取真實的不同型別的 DataStream,需要使用 Select 運算元,示例如下:
DataStreamSource<Integer> streamSource = env.fromElements(1,5,6,7,8);
// 標記
SplitStream<Integer> split = streamSource.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        output.add(value % 2 == 0 ? "even" : "odd");
        return output;
    }
});
// 獲取偶數資料集
split.select("even").print();
// 輸出 2,8
複製程式碼

2.9 project [DataStream → DataStream]

project 主要用於獲取 tuples 中的指定欄位集,示例如下:

DataStreamSource<Tuple3<String,String>> streamSource = env.fromElements(
                                                                         new Tuple3<>("li",22,"2018-09-23"),new Tuple3<>("ming",33,"2020-09-23"));
streamSource.project(0,2).print();

// 輸出
(li,2018-09-23)
(ming,2020-09-23)
複製程式碼

三、物理分割槽

物理分割槽 (Physical partitioning) 是 Flink 提供的底層的 API,允許使用者採用內建的分割槽規則或者自定義的分割槽規則來對資料進行分割槽,從而避免資料在某些分割槽上過於傾斜,常用的分割槽規則如下:

3.1 Random partitioning [DataStream → DataStream]

隨機分割槽 (Random partitioning) 用於隨機的將資料分佈到所有下游分割槽中,通過 shuffle 方法來進行實現:

dataStream.shuffle();
複製程式碼

3.2 Rebalancing [DataStream → DataStream]

Rebalancing 採用輪詢的方式將資料進行分割槽,其適合於存在資料傾斜的場景下,通過 rebalance 方法進行實現:

dataStream.rebalance();
複製程式碼

3.3 Rescaling [DataStream → DataStream]

當採用 Rebalancing 進行分割槽平衡時,其實現的是全域性性的負載均衡,資料會通過網路傳輸到其他節點上並完成分割槽資料的均衡。 而 Rescaling 則是低配版本的 rebalance,它不需要額外的網路開銷,它只會對上下游的運算元之間進行重新均衡,通過 rescale 方法進行實現:

dataStream.rescale();
複製程式碼

ReScale 這個單詞具有重新縮放的意義,其對應的操作也是如此,具體如下:如果上游 operation 並行度為 2,而下游的 operation 並行度為 6,則其中 1 個上游的 operation 會將元素分發到 3 個下游 operation,另 1 個上游 operation 則會將元素分發到另外 3 個下游 operation。反之亦然,如果上游的 operation 並行度為 6,而下游 operation 並行度為 2,則其中 3 個上游 operation 會將元素分發到 1 個下游 operation,另 3 個上游 operation 會將元素分發到另外 1 個下游operation:

https://github.com/heibaiying

3.4 Broadcasting [DataStream → DataStream]

將資料分發到所有分割槽上。通常用於小資料集與大資料集進行關聯的情況下,此時可以將小資料集廣播到所有分割槽上,避免頻繁的跨分割槽關聯,通過 broadcast 方法進行實現:

dataStream.broadcast();
複製程式碼

3.5 Custom partitioning [DataStream → DataStream]

Flink 執行使用者採用自定義的分割槽規則來實現分割槽,此時需要通過實現 Partitioner 介面來自定義分割槽規則,並指定對應的分割槽鍵,示例如下:

 DataStreamSource<Tuple2<String,Integer>> streamSource = env.fromElements(new Tuple2<>("Hadoop",new Tuple2<>("Spark",new Tuple2<>("Flink-streaming",new Tuple2<>("Flink-batch",4),new Tuple2<>("Storm",new Tuple2<>("HBase",3));
streamSource.partitionCustom(new Partitioner<String>() {
    @Override
    public int partition(String key,int numPartitions) {
        // 將第一個欄位包含flink的Tuple2分配到同一個分割槽
        return key.toLowerCase().contains("flink") ? 0 : 1;
    }
},0).print();


// 輸出如下:
1> (Flink-streaming,2)
1> (Flink-batch,4)
2> (Hadoop,1)
2> (Spark,1)
2> (Storm,4)
2> (HBase,3)
複製程式碼

四、任務鏈和資源組

任務鏈和資源組 ( Task chaining and resource groups ) 也是 Flink 提供的底層 API,用於控制任務鏈和資源分配。預設情況下,如果操作允許 (例如相鄰的兩次 map 操作) ,則 Flink 會嘗試將它們在同一個執行緒內進行,從而可以獲取更好的效能。但是 Flink 也允許使用者自己來控制這些行為,這就是任務鏈和資源組 API:

4.1 startNewChain

startNewChain 用於基於當前 operation 開啟一個新的任務鏈。如下所示,基於第一個 map 開啟一個新的任務鏈,此時前一個 map 和 後一個 map 將處於同一個新的任務鏈中,但它們與 filter 操作則分別處於不同的任務鏈中:

someStream.filter(...).map(...).startNewChain().map(...);
複製程式碼

4.2 disableChaining

disableChaining 操作用於禁止將其他操作與當前操作放置於同一個任務鏈中,示例如下:

someStream.map(...).disableChaining();
複製程式碼

4.3 slotSharingGroup

slot 是工作管理員 (TaskManager) 所擁有資源的固定子集,每個操作 (operation) 的子任務 (sub task) 都需要獲取 slot 來執行計算,但每個操作所需要資源的大小都是不相同的,為了更好地利用資源,Flink 允許不同操作的子任務被部署到同一 slot 中。slotSharingGroup 用於設定操作的 slot 共享組 (slot sharing group) ,Flink 會將具有相同 slot 共享組的操作放到同一個 slot 中 。示例如下:

someStream.filter(...).slotSharingGroup("slotSharingGroupName");
複製程式碼

參考資料

Flink Operators: ci.apache.org/projects/fl…

更多大資料系列文章可以參見 GitHub 開源專案大資料入門指南