Flink join(流流)詳解(一)
本文基於flink 1.11進行測試。
前言
這裡所說的join是兩個或者多個流的join,涉及流批join的內容或者批批join會另寫一篇文章專門說。
Flink的join按照視窗型別分可以分為:Tumbling Window Join、Sliding Window Join和Session Window Join。
按join型別分可以分為join和intervalJoin。前者類似RDBMS中的內連線,interval join使用一個公共鍵連線兩個流的元素(我們現在稱它們為A和B),其中流B的元素的時間戳與流A中元素的時間戳之間存在相對時間間隔。
inner join示意圖:
interval join示意圖:
本文會用到
- Flink1.11 事件時間(event time)、watermark、watermarkstrategy使用詳細案例文章的知識。
- Flink window function及常用視窗運算元文章的知識。
程式碼
建立用於join的兩個資料來源
package it.kenn.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import scala.Tuple3; import java.util.Random; public class ForJoinSource1 implements SourceFunction<Tuple3<String, Long, Double>> { boolean flag = true; @Override public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception { Random random = new Random(); while (flag) { int randInt = random.nextInt(100); ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000)); Thread.sleep(30); } } @Override public void cancel() { flag = false; } } //----------------------------------------------------------------------------------- package it.kenn.source; import org.apache.flink.streaming.api.functions.source.SourceFunction; import scala.Tuple3; import java.util.Random; public class ForJoinSource2 implements SourceFunction<Tuple3<String, Long, Double>> { boolean flag = true; @Override public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception { Random random = new Random(); while (flag) { int randInt = random.nextInt(110); ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000)); Thread.sleep(20); } } @Override public void cancel() { flag = false; } }
測試主程式
package it.kenn.join; import it.kenn.source.ForJoinSource1; import it.kenn.source.ForJoinSource2; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import scala.Tuple3; import java.time.Duration; /** * 測試join */ //測試主程式 public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamStreamJoinTest joinTest = new StreamStreamJoinTest(); //設定事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple3<String, Long, Double>> source30 = env.addSource(new ForJoinSource1()) //指定時間戳和watermark規則,注意要指定兩個:forBoundedOutOfOrderness指定watermark生成策略,withTimestampAssigner指定那個欄位是事件時間 .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2())); DataStream<Tuple3<String, Long, Double>> source20 = env.addSource(new ForJoinSource2()) .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2())); DataStream<Tuple3<String, Long, Double>> tumbJoinedStream = joinTest.tumbJoin(source20, source30); DataStream<Tuple3<String, Long, Double>> slidingJoinStream = joinTest.slidingJoin(source20, source30); DataStream<Tuple3<String, Long, Double>> intervalJoinStream = joinTest.intervalJoin(source20, source30); //對不同join進行測試 tumbJoinedStream.print(); env.execute(); }
inner Tumbling Window Join程式碼測試
/**
* inner Tumbling Window Join
*
* @param source20
* @param source30
* @return
*/
public DataStream<Tuple3<String, Long, Double>> tumbJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
.where(e -> e._1().split("-")[1])//左流要join的欄位
.equalTo(e -> e._1().split("-")[1])//右側流要join的欄位
.window(TumblingEventTimeWindows.of(Time.milliseconds(50)))//指定視窗型別和視窗大小
//join函式,這裡說是join但是跟資料庫的join有一些區別,比如下面的邏輯並沒有取兩個流中的資料,而是比較兩個流中資料的大小,只返回某個流中的資料
.apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
@Override
public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
return left._3() > right._3() ? left : right;
}
});
return joinedStream;
}
下圖是tumble window join的示意圖,但是下面join結果有些歧義,像是笛卡爾積。其實只要在join的時候加上where條件就不可能會產生下面笛卡爾積的情況了。
下圖還有一個資訊點,在最後一個視窗的時候,只有橙色流中有資料,綠色流中並沒有資料,那麼這個視窗的計算不會被觸發。
sliding Join 測試
/**
* sliding Join 測試
*
* @param source20
* @param source30
* @return
*/
public DataStream<Tuple3<String, Long, Double>> slidingJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
.where(e -> e._1().split("-")[1])//左流要join的欄位
.equalTo(e -> e._1().split("-")[1])//右側流要join的欄位
.window(SlidingEventTimeWindows.of(Time.milliseconds(50), Time.milliseconds(30)))//指定視窗型別和視窗大小
.apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
@Override
public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
return left._3() > right._3() ? left : right;
}
});
return joinedStream;
}
sliding join示意圖:
interval join有一個需要注意的特點:有些事件可能在一個滑動視窗中沒有被join但是在另外一個滑動視窗中去唄join了。比如上圖橙色2號事件,在藍色視窗中沒有與綠色流join,但是在後面的綠色視窗中卻與綠色3號join了。
Session Window Join測試
這種方式用到的場景好像不太多,如果哪天我用到了會在這裡補上筆記的。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
//指定Gap大小
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
session window join示意圖:
上圖是session window示意圖。可以看到他的原理是通過兩個流的間隔時間劃分的視窗,這種視窗的數量非常不穩定。如果流中event間隔一直小於指定的GAP,那麼視窗會一直不觸發。換句話說,這種視窗的觸發相比其他視窗而言比較被動,完全是資料驅動的觸發,而不是時間驅動的觸發。
interval Join 測試
/**
* interval Join 測試
*
* @param source20
* @param source30
* @return
*/
public DataStream<Tuple3<String, Long, Double>> intervalJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
SingleOutputStreamOperator<Tuple3<String, Long, Double>> intervalJoinedStream = source20.keyBy(e -> e._1().split("-")[1])
.intervalJoin(source30.keyBy(e -> e._1().split("-")[1]))
.between(Time.milliseconds(-12), Time.milliseconds(9))
//預設情況下上面的between條件是包含邊界的,如果不希望包含邊界可以使用下面兩個方法去除
.lowerBoundExclusive()
.upperBoundExclusive()
.process(new ProcessJoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
@Override
public void processElement(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right, Context ctx, Collector<Tuple3<String, Long, Double>> out) throws Exception {
out.collect(left._3() > right._3() ? left : right);
}
});
return intervalJoinedStream;
}
interval join示意圖:
上圖是interval join示意圖。之前遇到過一個場景。假設綠色流和黃色流是兩組人的運動軌跡。在黃色2位置,某人進了一家餐館,求跟黃色2號事件前後5分鐘同時進入這家餐館的綠色事件,使用interval join就很合適。
還有一點需要注意,在上面註釋也寫明瞭,圖中也畫出來了,預設情況下between是包含邊界的,如果要去掉邊界,需要使用上面兩個函式去除邊界,當然可以根據情況只去除一個邊界。