1. 程式人生 > 其它 >Flink join(流流)詳解(一)

Flink join(流流)詳解(一)

技術標籤:Flinkflink

本文基於flink 1.11進行測試。

前言

這裡所說的join是兩個或者多個流的join,涉及流批join的內容或者批批join會另寫一篇文章專門說。

Flink的join按照視窗型別分可以分為:Tumbling Window Join、Sliding Window Join和Session Window Join。

按join型別分可以分為join和intervalJoin。前者類似RDBMS中的內連線,interval join使用一個公共鍵連線兩個流的元素(我們現在稱它們為A和B),其中流B的元素的時間戳與流A中元素的時間戳之間存在相對時間間隔。

inner join示意圖:

interval join示意圖:

本文會用到

程式碼

建立用於join的兩個資料來源

package it.kenn.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;

import java.util.Random;

public class ForJoinSource1 implements SourceFunction<Tuple3<String, Long, Double>> {
    boolean flag = true;

    @Override
    public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception {
        Random random = new Random();
        while (flag) {
            int randInt = random.nextInt(100);
            ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000));
            Thread.sleep(30);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}
//-----------------------------------------------------------------------------------
package it.kenn.source;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;

import java.util.Random;

public class ForJoinSource2 implements SourceFunction<Tuple3<String, Long, Double>> {
    boolean flag = true;

    @Override
    public void run(SourceContext<Tuple3<String, Long, Double>> ctx) throws Exception {
        Random random = new Random();
        while (flag) {
            int randInt = random.nextInt(110);
            ctx.collect(new Tuple3<>("S" + randInt, System.currentTimeMillis(), random.nextDouble() * 1000));
            Thread.sleep(20);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

測試主程式

package it.kenn.join;

import it.kenn.source.ForJoinSource1;
import it.kenn.source.ForJoinSource2;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import scala.Tuple3;

import java.time.Duration;

/**
 * 測試join
 */
//測試主程式
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamStreamJoinTest joinTest = new StreamStreamJoinTest();
        //設定事件時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<Tuple3<String, Long, Double>> source30 = env.addSource(new ForJoinSource1())
                //指定時間戳和watermark規則,注意要指定兩個:forBoundedOutOfOrderness指定watermark生成策略,withTimestampAssigner指定那個欄位是事件時間
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2()));
        DataStream<Tuple3<String, Long, Double>> source20 = env.addSource(new ForJoinSource2())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofMillis(10)).withTimestampAssigner((e, ts) -> e._2()));
        DataStream<Tuple3<String, Long, Double>> tumbJoinedStream = joinTest.tumbJoin(source20, source30);
        DataStream<Tuple3<String, Long, Double>> slidingJoinStream = joinTest.slidingJoin(source20, source30);
        DataStream<Tuple3<String, Long, Double>> intervalJoinStream = joinTest.intervalJoin(source20, source30);
        //對不同join進行測試
        tumbJoinedStream.print();
        env.execute();
    }

inner Tumbling Window Join程式碼測試

/**
     * inner Tumbling Window Join
     *
     * @param source20
     * @param source30
     * @return
     */
    public DataStream<Tuple3<String, Long, Double>> tumbJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
        DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
                .where(e -> e._1().split("-")[1])//左流要join的欄位
                .equalTo(e -> e._1().split("-")[1])//右側流要join的欄位
                .window(TumblingEventTimeWindows.of(Time.milliseconds(50)))//指定視窗型別和視窗大小
                //join函式,這裡說是join但是跟資料庫的join有一些區別,比如下面的邏輯並沒有取兩個流中的資料,而是比較兩個流中資料的大小,只返回某個流中的資料
                .apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
                    @Override
                    public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
                        return left._3() > right._3() ? left : right;
                    }
                });
        return joinedStream;
    }

下圖是tumble window join的示意圖,但是下面join結果有些歧義,像是笛卡爾積。其實只要在join的時候加上where條件就不可能會產生下面笛卡爾積的情況了。

下圖還有一個資訊點,在最後一個視窗的時候,只有橙色流中有資料,綠色流中並沒有資料,那麼這個視窗的計算不會被觸發。

sliding Join 測試

/**
     * sliding Join 測試
     *
     * @param source20
     * @param source30
     * @return
     */
    public DataStream<Tuple3<String, Long, Double>> slidingJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
        DataStream<Tuple3<String, Long, Double>> joinedStream = source20.join(source30)
                .where(e -> e._1().split("-")[1])//左流要join的欄位
                .equalTo(e -> e._1().split("-")[1])//右側流要join的欄位
                .window(SlidingEventTimeWindows.of(Time.milliseconds(50), Time.milliseconds(30)))//指定視窗型別和視窗大小
                .apply(new JoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
                    @Override
                    public Tuple3<String, Long, Double> join(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right) throws Exception {
                        return left._3() > right._3() ? left : right;
                    }
                });
        return joinedStream;
    }

sliding join示意圖:

interval join有一個需要注意的特點:有些事件可能在一個滑動視窗中沒有被join但是在另外一個滑動視窗中去唄join了。比如上圖橙色2號事件,在藍色視窗中沒有與綠色流join,但是在後面的綠色視窗中卻與綠色3號join了。

Session Window Join測試

這種方式用到的場景好像不太多,如果哪天我用到了會在這裡補上筆記的。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 
...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    //指定Gap大小
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

session window join示意圖:

上圖是session window示意圖。可以看到他的原理是通過兩個流的間隔時間劃分的視窗,這種視窗的數量非常不穩定。如果流中event間隔一直小於指定的GAP,那麼視窗會一直不觸發。換句話說,這種視窗的觸發相比其他視窗而言比較被動,完全是資料驅動的觸發,而不是時間驅動的觸發。

interval Join 測試

/**
     * interval Join 測試
     *
     * @param source20
     * @param source30
     * @return
     */
    public DataStream<Tuple3<String, Long, Double>> intervalJoin(DataStream<Tuple3<String, Long, Double>> source20, DataStream<Tuple3<String, Long, Double>> source30) {
        SingleOutputStreamOperator<Tuple3<String, Long, Double>> intervalJoinedStream = source20.keyBy(e -> e._1().split("-")[1])
                .intervalJoin(source30.keyBy(e -> e._1().split("-")[1]))
                .between(Time.milliseconds(-12), Time.milliseconds(9))
                //預設情況下上面的between條件是包含邊界的,如果不希望包含邊界可以使用下面兩個方法去除
                .lowerBoundExclusive()
                .upperBoundExclusive()
                .process(new ProcessJoinFunction<Tuple3<String, Long, Double>, Tuple3<String, Long, Double>, Tuple3<String, Long, Double>>() {
                    @Override
                    public void processElement(Tuple3<String, Long, Double> left, Tuple3<String, Long, Double> right, Context ctx, Collector<Tuple3<String, Long, Double>> out) throws Exception {
                        out.collect(left._3() > right._3() ? left : right);
                    }
                });
        return intervalJoinedStream;
    }

interval join示意圖:

上圖是interval join示意圖。之前遇到過一個場景。假設綠色流和黃色流是兩組人的運動軌跡。在黃色2位置,某人進了一家餐館,求跟黃色2號事件前後5分鐘同時進入這家餐館的綠色事件,使用interval join就很合適。

還有一點需要注意,在上面註釋也寫明瞭,圖中也畫出來了,預設情況下between是包含邊界的,如果要去掉邊界,需要使用上面兩個函式去除邊界,當然可以根據情況只去除一個邊界。