1. 程式人生 > 實用技巧 >Kafka Streams開發入門(10)

Kafka Streams開發入門(10)

1. 背景

上一篇介紹了Kafka Streams的時間視窗以及Tumbling Window的例項。這一篇我們利用Kafka Streams中的KTable概念來實時計算一組電影的平均分數。

2.功能演示說明

這篇文章中我們會建立一個Kafka topic來表示電影打分事件,然後我們編寫一個程式實時統計當前電影的平均分數。我們依然使用ProtocolBuffer對訊息事件進行序列化。事件的JSON格式如下所示:

{"movie_id": 362, "rating": 9.6}
{"movie_id": 362, "rating": 9.7}
{"movie_id": 362, "rating": 8.6}

當Kafka Streams程式依次處理這3條事件時,它將依次產生以下輸出:

> 9.6
> 9.65
> 9.3

3. 配置專案

第1步是建立專案功能所在路徑,命令如下:

$ mkdir aggregating-average
$ cd aggregating-average

然後在新建立的aggregating-average路徑下新建Gradle配置檔案build.gradle,內容如下:

buildscript {
  repositories {
    jcenter()
  }
  dependencies {
    classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
  }
}
plugins {
  id "java"
  id "com.google.protobuf" version "0.8.12"
}
apply plugin: 'com.github.johnrengelman.shadow'
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"
repositories {
  mavenCentral()
  jcenter()
  maven { url 'https://packages.confluent.io/maven/' }
}
group 'huxihx.kafkastreams'
dependencies {
  implementation 'com.google.protobuf:protobuf-java:3.12.4'
  implementation 'org.slf4j:slf4j-simple:1.7.30'
  implementation 'org.apache.kafka:kafka-streams:2.5.0'
  implementation "com.typesafe:config:1.4.0"

  testCompile group: 'junit', name: 'junit', version: '4.13'
}

protobuf {
  generatedFilesBaseDir = "$projectDir/src/"
  protoc {
    artifact = 'com.google.protobuf:protoc:3.12.4'
  }
}
jar {
  manifest {
    attributes(
        "Class-Path": configurations.runtime.collect { it.getName() }.join(" "),
        "Main-Class": 'huxihx.kafkastreams.RunningAverage' 
    )
  }
}
shadowJar {
  archiveFileName = "aggregating-average-standalone-$version.$extension"
}  

我們指定app主類是huxihx.kafkastreams.RunningAverage。之後,儲存上面的檔案,然後執行下列命令下載Gradle的wrapper套件:

$ gradle wrapper

做完這些之後,我們在aggregating-average目錄下建立名為configuration的子目錄,用於儲存我們的引數配置檔案dev.properties:

$ mkdir configuration
$ cd configuration
$ vi dev.properties

dev.properties檔案內容如下:  

application.id=kafka-films
request.timeout.ms=20000
bootstrap.servers=localhost:9092
retry.backoff.ms=500
default.topic.replication.factor=1
offset.reset.policy=latest
input.ratings.topic.name=ratings
input.ratings.topic.partitions=1
input.ratings.topic.replication.factor=1
output.rating-averages.topic.name=rating-averages
output.rating-averages.topic.partitions=1
output.rating-averages.topic.replication.factor=1

這裡我們建立了一個輸入topic:ratings和一個輸出topic:rating-averages。前者表示電影打分事件,後者儲存電影的平均分數。

4. 建立訊息Schema

由於我們使用ProtocolBuffer進行序列化,因此我們要提前生成好Java類來建模實體訊息。我們在aggregating-average路徑下執行以下命令建立儲存schema的資料夾:

$ mkdir -p src/main/proto
$ cd src/main/proto

之後在proto資料夾下建立名為rating.proto檔案,內容如下:

syntax = "proto3";
   
package huxihx.kafkastreams.proto;
   
message Rating {
    int64 movie_id = 1;
    double rating = 2;
}

之後建立countsum.proto檔案儲存計算平均數所需的count和sum資訊:

syntax = "proto3";
   
package huxihx.kafkastreams.proto;
   
message CountAndSum {
    int64 count = 1;
    double sum = 2;
}

儲存上面的檔案之後在aggregating-average目錄下執行gradlew命令:

$ ./gradlew build

此時,你應該可以在aggregating-average的src/main/java/huxihx/kafkastreams/proto下看到生成的兩個Java類:RatingOuterClass和Countsum。

5. 建立Serdes

這一步我們為所需的topic訊息建立Serdes。首先在aggregating-average目錄下執行下面的命令建立對應的資料夾目錄:  

$ mkdir -p src/main/java/huxihx/kafkastreams/serdes

在新建立的serdes資料夾下建立ProtobufSerializer.java,內容如下:

package huxihx.kafkastreams.serdes;
    
import com.google.protobuf.MessageLite;
import org.apache.kafka.common.serialization.Serializer;
    
public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
    @Override
    public byte[] serialize(String topic, T data) {
        return data == null ? new byte[0] : data.toByteArray();
    }
}

接下來是建立ProtobufDeserializer.java:

package huxihx.kafkastreams.serdes;
    
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
    
import java.util.Map;
    
public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
    
    private Parser<T> parser;
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        parser = (Parser<T>) configs.get("parser");
    }
    
    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return parser.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
        }
    }
}

最後是ProtobufSerdes.java:

package huxihx.kafkastreams.serdes;
    
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
    
import java.util.HashMap;
import java.util.Map;
    
public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
    
    private final Serializer<T> serializer;
    private final Deserializer<T> deserializer;
    
    public ProtobufSerdes(Parser<T> parser) {
        serializer = new ProtobufSerializer<>();
        deserializer = new ProtobufDeserializer<>();
        Map<String, Parser<T>> config = new HashMap<>();
        config.put("parser", parser);
        deserializer.configure(config, false);
    }
    
    @Override
    public Serializer<T> serializer() {
        return serializer;
    }
    
    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

6. 開發主流程

建立RunningAverage.java來執行平均分輸的計算。注意程式碼中的getRatingAverageTable方法是如何計算平均分數的。

package huxihx.kafkastreams;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import huxihx.kafkastreams.proto.Countsum;
import huxihx.kafkastreams.proto.RatingOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerdes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class RunningAverage {

    private static ProtobufSerdes<RatingOuterClass.Rating> ratingSerdes() {
        return new ProtobufSerdes<>(RatingOuterClass.Rating.parser());
    }

    private static ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes() {
        return new ProtobufSerdes<>(Countsum.CountAndSum.parser());
    }

    public static void main(String[] args) throws Exception {
        new RunningAverage().runRecipe();
    }

    private Properties loadEnvProperties() {
        final Config load = ConfigFactory.load();
        final Map<String, Object> map = load.entrySet().stream()
                .filter(entry -> Stream.of("java", "user", "sun", "os", "http", "ftp", "file", "line", "awt", "gopher", "socks", "path")
                        .noneMatch(s -> entry.getKey().startsWith(s)))
                .peek(filteredEntry -> System.out.println(filteredEntry.getKey() + ": " + filteredEntry.getValue().unwrapped()))
                .collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().unwrapped()));

        Properties props = new Properties();
        props.putAll(map);
        return props;
    }

    private void runRecipe() throws Exception {
        Properties envProps = this.loadEnvProperties();
        Properties streamProps = this.createStreamsProperties(envProps);
        Topology topology = this.buildTopology(new StreamsBuilder(), envProps);
        this.preCreateTopics(envProps);

        final KafkaStreams streams = new KafkaStreams(topology, streamProps);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.cleanUp();
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

    private static KTable<Long, Double> getRatingAverageTable(KStream<Long, RatingOuterClass.Rating> ratings,
                                                              String avgRatingsTopicName,
                                                              ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes) {
        KGroupedStream<Long, Double> ratingsById = ratings
                .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating.getRating()))
                .groupByKey(Grouped.with(Serdes.Long(), Serdes.Double()));
        final KTable<Long, Countsum.CountAndSum> ratingCountAndSum =
                ratingsById.aggregate(() -> Countsum.CountAndSum.newBuilder().setCount(0L).setSum(0.0D).build(),
                        (key, value, aggregate) -> Countsum.CountAndSum.newBuilder().setCount(aggregate.getCount() + 1).setSum(aggregate.getSum() + value).build(),
                        Materialized.with(Serdes.Long(), countAndSumSerdes));
        final KTable<Long, Double> ratingAverage =
                ratingCountAndSum.mapValues(value -> value.getSum() / value.getCount(), Materialized.as("average-ratings"));
        ratingAverage.toStream().to(avgRatingsTopicName);
        return ratingAverage;
    }

    private Topology buildTopology(StreamsBuilder builder, Properties envProps) {
        final String ratingTopicName = envProps.getProperty("input.ratings.topic.name");
        final String avgRatingsTopicName = envProps.getProperty("output.rating-averages.topic.name");
        KStream<Long, RatingOuterClass.Rating> ratingStream = builder.stream(ratingTopicName,
                Consumed.with(Serdes.Long(), ratingSerdes()));
        getRatingAverageTable(ratingStream, avgRatingsTopicName, countAndSumSerdes());

        return builder.build();
    }


    private static void preCreateTopics(Properties envProps) throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        String inputTopic = envProps.getProperty("input.ratings.topic.name");
        String outputTopic = envProps.getProperty("output.rating-averages.topic.name");
        try (AdminClient client = AdminClient.create(config)) {
            Collection<TopicListing> existingTopics = client.listTopics().listings().get();

            List<NewTopic> topics = new ArrayList<>();
            List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
            if (!topicNames.contains(inputTopic))
                topics.add(new NewTopic(
                        inputTopic,
                        Integer.parseInt(envProps.getProperty("input.ratings.topic.partitions")),
                        Short.parseShort(envProps.getProperty("input.ratings.topic.replication.factor"))));

            if (!topicNames.contains(outputTopic))
                topics.add(new NewTopic(
                        outputTopic,
                        Integer.parseInt(envProps.getProperty("output.rating-averages.topic.partitions")),
                        Short.parseShort(envProps.getProperty("output.rating-averages.topic.replication.factor"))));

            if (!topics.isEmpty())
                client.createTopics(topics).all().get();
        }
    }

    private Properties createStreamsProperties(Properties envProps) {
        Properties props = new Properties();
        props.putAll(envProps);

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, envProps.getProperty("default.topic.replication.factor"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, envProps.getProperty("offset.reset.policy"));

        return props;
    }
}

7. 編寫測試Producer

現在建立src/main/java/huxihx/kafkastreams/tests/TestProducer.java,程式碼如下:  

package huxihx.kafkastreams.tests;

import huxihx.kafkastreams.proto.RatingOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<RatingOuterClass.Rating>().getClass());

        try (final Producer<String, RatingOuterClass.Rating> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, RatingOuterClass.Rating> event =
                    new ProducerRecord<>("ratings", RatingOuterClass.Rating.newBuilder().setMovieId(362).setRating(Double.valueOf(args[0])).build());
            producer.send(event, ((metadata, exception) -> {
                if (exception != null) {
                    exception.printStackTrace();
                }
            }));
        }
    }
} 

這個測試Producer通過命令列引數的方式指定電影的分數。

8. 測試

首先我們執行下列命令構建專案:

$ ./gradlew shadowJar

然後啟動Kafka叢集,之後執行Kafka Streams應用:

$ java -Dconfig.file=configuration/dev.properties -jar build/libs/aggregating-average-standalone-0.0.1.jar

現在啟動一個終端開啟console consumer:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --group test-group --topic rating-averages --value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer  

由於平均分數使用Double型別表示,因此console consumer必須指定訊息體的deserializer為DoubleDeserializer。

之後在aggregating-average路徑下開啟終端,多次執行TestProducer生成電影分數:

$ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.6
$ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.7
$ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 8.6

此時,回到console consumer的終端,你應該可以看到下面的輸出:

9.6
9.65
9.3  

這表明,Kafka Streams app能夠正確地實時計算電影的平均分數。