1. 程式人生 > 其它 >Flink實戰案例(四十五): Operators(六)多流轉換運算元(一)Union

Flink實戰案例(四十五): Operators(六)多流轉換運算元(一)Union

技術標籤:Flink入門

宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

UNION

DataStream.union()方法將兩條或者多條DataStream合併成一條具有與輸入流相同型別的輸出DataStream。接下來的轉換運算元將會處理輸入流中的所有元素。圖5-5展示了union操作符如何將黑色和白色的事件流合併成一個單一輸出流。

事件合流的方式為FIFO方式。操作符並不會產生一個特定順序的事件流。union操作符也不會進行去重。每一個輸入事件都被髮送到了下一個操作符。

說明:

1.union 合併的流的元素必須是相同的

2.union 可以合併多條流

3.union不去重,合流順序為先進先出

例項一:

下面的例子展示瞭如何將三條型別為SensorReading的資料流合併成一條流。

scala version

val parisStream: DataStream[SensorReading] = ...
val tokyoStream: DataStream[SensorReading] = ...
val rioStream: DataStream[SensorReading] = ...
val allCities: DataStream[SensorReading] = parisStream
  .union(tokyoStream, rioStream)

java version

DataStream<SensorReading> parisStream = ...
DataStream<SensorReading> tokyoStream = ...
DataStream<SensorReading> rioStream = ...
DataStream<SensorReading> allCities = parisStream
  .union(tokyoStream, rioStream)

例項二:

需求

本篇文章我們看一下union的用法,將多個幣種的匯率流合併成一個匯率流。

解決方案

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //USD -> CNY 匯率流
        DataStreamSource<ExchangeRateInfo> usdToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.USD, CurrencyType.CNY, 7, 6),"USD-CNY");
        //EUR -> CNY 匯率流
        DataStreamSource<ExchangeRateInfo> eurToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.EUR, CurrencyType.CNY, 8, 7),"EUR-CNY");
        //AUD -> CNY 匯率流
        DataStreamSource<ExchangeRateInfo> audToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.AUD, CurrencyType.CNY, 5, 4),"AUD-CNY");
        //三個流合併為一個流
        DataStream<ExchangeRateInfo> allExchangeRate = usdToCny.union(eurToCny).union(audToCny);
        //將流標準輸出
        allExchangeRate.print();

        env.execute("Flink Streaming Java API Skeleton");
    }

執行效果

通過union函式將
USD -> CNY 匯率流
EUR -> CNY 匯率流
AUD -> CNY 匯率流

統一合併成一個匯率流

1> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.60]
1> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.74]
1> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.05]
2> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.03]
2> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.36]
2> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.83]
3> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.31]
3> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.71]
3> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.79]
4> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.08]
4> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.21]
4> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.46]

小結

在sql中union是將多個查詢結果集合彙總成一個結果集合,而在流計算中類似的將多個流合併成一個流。

程式碼地址

https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session14/main/StreamTest.java