Flink實戰案例(四十五): Operators(六)多流轉換運算元(一)Union
阿新 • • 發佈:2021-02-01
技術標籤:Flink入門
宣告:本系列部落格是根據SGG的視訊整理而成,非常適合大家入門學習。
UNION
DataStream.union()方法將兩條或者多條DataStream合併成一條具有與輸入流相同型別的輸出DataStream。接下來的轉換運算元將會處理輸入流中的所有元素。圖5-5展示了union操作符如何將黑色和白色的事件流合併成一個單一輸出流。
事件合流的方式為FIFO方式。操作符並不會產生一個特定順序的事件流。union操作符也不會進行去重。每一個輸入事件都被髮送到了下一個操作符。
說明:
1.union 合併的流的元素必須是相同的
2.union 可以合併多條流
3.union不去重,合流順序為先進先出
例項一:
下面的例子展示瞭如何將三條型別為SensorReading的資料流合併成一條流。
scala version
val parisStream: DataStream[SensorReading] = ... val tokyoStream: DataStream[SensorReading] = ... val rioStream: DataStream[SensorReading] = ... val allCities: DataStream[SensorReading] = parisStream .union(tokyoStream, rioStream)
java version
DataStream<SensorReading> parisStream = ... DataStream<SensorReading> tokyoStream = ... DataStream<SensorReading> rioStream = ... DataStream<SensorReading> allCities = parisStream .union(tokyoStream, rioStream)
例項二:
需求
本篇文章我們看一下union的用法,將多個幣種的匯率流合併成一個匯率流。
解決方案
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //USD -> CNY 匯率流 DataStreamSource<ExchangeRateInfo> usdToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.USD, CurrencyType.CNY, 7, 6),"USD-CNY"); //EUR -> CNY 匯率流 DataStreamSource<ExchangeRateInfo> eurToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.EUR, CurrencyType.CNY, 8, 7),"EUR-CNY"); //AUD -> CNY 匯率流 DataStreamSource<ExchangeRateInfo> audToCny = env.addSource(new ExchangeRateDataSource(CurrencyType.AUD, CurrencyType.CNY, 5, 4),"AUD-CNY"); //三個流合併為一個流 DataStream<ExchangeRateInfo> allExchangeRate = usdToCny.union(eurToCny).union(audToCny); //將流標準輸出 allExchangeRate.print(); env.execute("Flink Streaming Java API Skeleton"); }
執行效果
通過union函式將
USD -> CNY 匯率流
EUR -> CNY 匯率流
AUD -> CNY 匯率流
統一合併成一個匯率流
1> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.60] 1> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.74] 1> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.05] 2> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.03] 2> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.36] 2> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.83] 3> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.31] 3> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.71] 3> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.79] 4> ExchangeRateInfo [from=AUD, to=CNY, coefficient=4.08] 4> ExchangeRateInfo [from=USD, to=CNY, coefficient=6.21] 4> ExchangeRateInfo [from=EUR, to=CNY, coefficient=7.46]
小結
在sql中union是將多個查詢結果集合彙總成一個結果集合,而在流計算中類似的將多個流合併成一個流。
程式碼地址
https://github.com/chaoxxx/learn-flink-stream-api/blob/master/src/main/java/fun/cosmozhu/session14/main/StreamTest.java