1. 程式人生 > 其它 >Kafka Stream之Hopping、Tumbling 、Session視窗

Kafka Stream之Hopping、Tumbling 、Session視窗

技術標籤:kafkaKafka Stream視窗

Kafka Stream視窗


流式資料是在時間上無界的資料。而聚合操作只能作用在特定的資料集,也即有界的資料集上。因此需要通過某種方式從無界的資料集上按特定的語義選取出有界的資料。視窗是一種非常常用的設定計算邊界的方式。不同的流式處理系統支援的視窗類似,但不盡相同。

Kafka Stream支援的視窗如下:

(1)Hopping Time Window

該視窗定義如下圖所示。它有兩個屬性,一個是Window size,一個是Advance interval。Window size指定了視窗的大小,也即每次計算的資料集的大小。而Advance interval定義輸出的時間間隔。一個典型的應用場景是,每隔5秒鐘輸出一次過去1個小時內網站的PV或者UV。
在這裡插入圖片描述

(2)Tumbling Time Window

該視窗定義如下圖所示。可以認為它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交。
在這裡插入圖片描述

(3)Sliding Window

該視窗只用於2個KStream進行Join計算時。該視窗的大小定義了Join兩側KStream的資料記錄被認為在同一個視窗的最大時間差。假設該視窗的大小為5秒,則參與Join的2個KStream中,記錄時間差小於5的記錄被認為在同一個視窗中,可以進行Join計算。

(4)Session Window

該視窗用於對Key做Group後的聚合操作中。它需要對Key做分組,然後對組內的資料根據業務需求定義一個視窗的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個使用者訪問網站的時間。對於一個特定的使用者(用Key表示)而言,當發生登入操作時,該使用者(Key)的視窗即開始,當發生退出操作或者超時時,該使用者(Key)的視窗即結束。視窗結束時,可計算該使用者的訪問時間或者點選次數等。

(5)Hopping Time+Tumbling Time+Session Window 程式碼

package cn.bright.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.SessionWindows;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Bright
 * @Date 2020/12/16
 * @Description
 */
public class WindowStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        //這裡的每次執行不同視窗功能時 ID需要每次變更
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"WindowStream3");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> source = builder.stream("windowdemo");
        source.flatMapValues(value-> Arrays.asList(value.toString().split("\\s+")))
                .map((x,y)-> {
                    return new KeyValue<String, String>(y, "1");
                }).groupByKey()
                //切換成Tumbing Time Window 功能
//                .windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))
                //切換成Hopping Time Window 功能
              //.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()).advanceBy(Duration.ofSeconds(1).toMillis()))
               
 //切換成 Session Window 功能
                .windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))

                .count().toStream().foreach((x,y)->{
            System.out.println("x:"+x+" y: "+y);
        });

        final Topology topo =builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        streams.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}