Kafka Streams開發入門(10)
1. 背景
上一篇介紹了Kafka Streams的時間視窗以及Tumbling Window的例項。這一篇我們利用Kafka Streams中的KTable概念來實時計算一組電影的平均分數。
2.功能演示說明
這篇文章中我們會建立一個Kafka topic來表示電影打分事件,然後我們編寫一個程式實時統計當前電影的平均分數。我們依然使用ProtocolBuffer對訊息事件進行序列化。事件的JSON格式如下所示:
{"movie_id": 362, "rating": 9.6} {"movie_id": 362, "rating": 9.7} {"movie_id": 362, "rating": 8.6}
當Kafka Streams程式依次處理這3條事件時,它將依次產生以下輸出:
> 9.6 > 9.65 > 9.3
3. 配置專案
第1步是建立專案功能所在路徑,命令如下:
$ mkdir aggregating-average $ cd aggregating-average
然後在新建立的aggregating-average路徑下新建Gradle配置檔案build.gradle,內容如下:
buildscript { repositories { jcenter() } dependencies { classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2" } } plugins { id "java" id "com.google.protobuf" version "0.8.12" } apply plugin: 'com.github.johnrengelman.shadow' sourceCompatibility = "1.8" targetCompatibility = "1.8" version = "0.0.1" repositories { mavenCentral() jcenter() maven { url 'https://packages.confluent.io/maven/' } } group 'huxihx.kafkastreams' dependencies { implementation 'com.google.protobuf:protobuf-java:3.12.4' implementation 'org.slf4j:slf4j-simple:1.7.30' implementation 'org.apache.kafka:kafka-streams:2.5.0' implementation "com.typesafe:config:1.4.0" testCompile group: 'junit', name: 'junit', version: '4.13' } protobuf { generatedFilesBaseDir = "$projectDir/src/" protoc { artifact = 'com.google.protobuf:protoc:3.12.4' } } jar { manifest { attributes( "Class-Path": configurations.runtime.collect { it.getName() }.join(" "), "Main-Class": 'huxihx.kafkastreams.RunningAverage' ) } } shadowJar { archiveFileName = "aggregating-average-standalone-$version.$extension" }
我們指定app主類是huxihx.kafkastreams.RunningAverage。之後,儲存上面的檔案,然後執行下列命令下載Gradle的wrapper套件:
$ gradle wrapper
做完這些之後,我們在aggregating-average目錄下建立名為configuration的子目錄,用於儲存我們的引數配置檔案dev.properties:
$ mkdir configuration $ cd configuration $ vi dev.properties
dev.properties檔案內容如下:
application.id=kafka-filmsrequest.timeout.ms=20000
bootstrap.servers=localhost:9092
retry.backoff.ms=500
default.topic.replication.factor=1
offset.reset.policy=latest
input.ratings.topic.name=ratings
input.ratings.topic.partitions=1
input.ratings.topic.replication.factor=1
output.rating-averages.topic.name=rating-averages
output.rating-averages.topic.partitions=1
output.rating-averages.topic.replication.factor=1
這裡我們建立了一個輸入topic:ratings和一個輸出topic:rating-averages。前者表示電影打分事件,後者儲存電影的平均分數。
4. 建立訊息Schema
由於我們使用ProtocolBuffer進行序列化,因此我們要提前生成好Java類來建模實體訊息。我們在aggregating-average路徑下執行以下命令建立儲存schema的資料夾:
$ mkdir -p src/main/proto $ cd src/main/proto
之後在proto資料夾下建立名為rating.proto檔案,內容如下:
syntax = "proto3"; package huxihx.kafkastreams.proto; message Rating { int64 movie_id = 1; double rating = 2; }
之後建立countsum.proto檔案儲存計算平均數所需的count和sum資訊:
syntax = "proto3"; package huxihx.kafkastreams.proto; message CountAndSum { int64 count = 1; double sum = 2; }
儲存上面的檔案之後在aggregating-average目錄下執行gradlew命令:
$ ./gradlew build
此時,你應該可以在aggregating-average的src/main/java/huxihx/kafkastreams/proto下看到生成的兩個Java類:RatingOuterClass和Countsum。
5. 建立Serdes
這一步我們為所需的topic訊息建立Serdes。首先在aggregating-average目錄下執行下面的命令建立對應的資料夾目錄:
$ mkdir -p src/main/java/huxihx/kafkastreams/serdes
在新建立的serdes資料夾下建立ProtobufSerializer.java,內容如下:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import org.apache.kafka.common.serialization.Serializer; public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> { @Override public byte[] serialize(String topic, T data) { return data == null ? new byte[0] : data.toByteArray(); } }
接下來是建立ProtobufDeserializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> { private Parser<T> parser; @Override public void configure(Map<String, ?> configs, boolean isKey) { parser = (Parser<T>) configs.get("parser"); } @Override public T deserialize(String topic, byte[] data) { try { return parser.parseFrom(data); } catch (InvalidProtocolBufferException e) { throw new SerializationException("Failed to deserialize from a protobuf byte array.", e); } } }
最後是ProtobufSerdes.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; import java.util.Map; public class ProtobufSerdes<T extends MessageLite> implements Serde<T> { private final Serializer<T> serializer; private final Deserializer<T> deserializer; public ProtobufSerdes(Parser<T> parser) { serializer = new ProtobufSerializer<>(); deserializer = new ProtobufDeserializer<>(); Map<String, Parser<T>> config = new HashMap<>(); config.put("parser", parser); deserializer.configure(config, false); } @Override public Serializer<T> serializer() { return serializer; } @Override public Deserializer<T> deserializer() { return deserializer; } }
6. 開發主流程
建立RunningAverage.java來執行平均分輸的計算。注意程式碼中的getRatingAverageTable方法是如何計算平均分數的。
package huxihx.kafkastreams; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import huxihx.kafkastreams.proto.Countsum; import huxihx.kafkastreams.proto.RatingOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerdes; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.Stream; public class RunningAverage { private static ProtobufSerdes<RatingOuterClass.Rating> ratingSerdes() { return new ProtobufSerdes<>(RatingOuterClass.Rating.parser()); } private static ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes() { return new ProtobufSerdes<>(Countsum.CountAndSum.parser()); } public static void main(String[] args) throws Exception { new RunningAverage().runRecipe(); } private Properties loadEnvProperties() { final Config load = ConfigFactory.load(); final Map<String, Object> map = load.entrySet().stream() .filter(entry -> Stream.of("java", "user", "sun", "os", "http", "ftp", "file", "line", "awt", "gopher", "socks", "path") .noneMatch(s -> entry.getKey().startsWith(s))) .peek(filteredEntry -> System.out.println(filteredEntry.getKey() + ": " + filteredEntry.getValue().unwrapped())) .collect(Collectors.toMap(Map.Entry::getKey, y -> y.getValue().unwrapped())); Properties props = new Properties(); props.putAll(map); return props; } private void runRecipe() throws Exception { Properties envProps = this.loadEnvProperties(); Properties streamProps = this.createStreamsProperties(envProps); Topology topology = this.buildTopology(new StreamsBuilder(), envProps); this.preCreateTopics(envProps); final KafkaStreams streams = new KafkaStreams(topology, streamProps); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(Duration.ofSeconds(5)); latch.countDown(); } }); try { streams.cleanUp(); streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } private static KTable<Long, Double> getRatingAverageTable(KStream<Long, RatingOuterClass.Rating> ratings, String avgRatingsTopicName, ProtobufSerdes<Countsum.CountAndSum> countAndSumSerdes) { KGroupedStream<Long, Double> ratingsById = ratings .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating.getRating())) .groupByKey(Grouped.with(Serdes.Long(), Serdes.Double())); final KTable<Long, Countsum.CountAndSum> ratingCountAndSum = ratingsById.aggregate(() -> Countsum.CountAndSum.newBuilder().setCount(0L).setSum(0.0D).build(), (key, value, aggregate) -> Countsum.CountAndSum.newBuilder().setCount(aggregate.getCount() + 1).setSum(aggregate.getSum() + value).build(), Materialized.with(Serdes.Long(), countAndSumSerdes)); final KTable<Long, Double> ratingAverage = ratingCountAndSum.mapValues(value -> value.getSum() / value.getCount(), Materialized.as("average-ratings")); ratingAverage.toStream().to(avgRatingsTopicName); return ratingAverage; } private Topology buildTopology(StreamsBuilder builder, Properties envProps) { final String ratingTopicName = envProps.getProperty("input.ratings.topic.name"); final String avgRatingsTopicName = envProps.getProperty("output.rating-averages.topic.name"); KStream<Long, RatingOuterClass.Rating> ratingStream = builder.stream(ratingTopicName, Consumed.with(Serdes.Long(), ratingSerdes())); getRatingAverageTable(ratingStream, avgRatingsTopicName, countAndSumSerdes()); return builder.build(); } private static void preCreateTopics(Properties envProps) throws Exception { Map<String, Object> config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); String inputTopic = envProps.getProperty("input.ratings.topic.name"); String outputTopic = envProps.getProperty("output.rating-averages.topic.name"); try (AdminClient client = AdminClient.create(config)) { Collection<TopicListing> existingTopics = client.listTopics().listings().get(); List<NewTopic> topics = new ArrayList<>(); List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList()); if (!topicNames.contains(inputTopic)) topics.add(new NewTopic( inputTopic, Integer.parseInt(envProps.getProperty("input.ratings.topic.partitions")), Short.parseShort(envProps.getProperty("input.ratings.topic.replication.factor")))); if (!topicNames.contains(outputTopic)) topics.add(new NewTopic( outputTopic, Integer.parseInt(envProps.getProperty("output.rating-averages.topic.partitions")), Short.parseShort(envProps.getProperty("output.rating-averages.topic.replication.factor")))); if (!topics.isEmpty()) client.createTopics(topics).all().get(); } } private Properties createStreamsProperties(Properties envProps) { Properties props = new Properties(); props.putAll(envProps); props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id")); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass()); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, envProps.getProperty("default.topic.replication.factor")); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, envProps.getProperty("offset.reset.policy")); return props; } }
7. 編寫測試Producer
現在建立src/main/java/huxihx/kafkastreams/tests/TestProducer.java,程式碼如下:
package huxihx.kafkastreams.tests; import huxihx.kafkastreams.proto.RatingOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<RatingOuterClass.Rating>().getClass()); try (final Producer<String, RatingOuterClass.Rating> producer = new KafkaProducer<>(props)) { ProducerRecord<String, RatingOuterClass.Rating> event = new ProducerRecord<>("ratings", RatingOuterClass.Rating.newBuilder().setMovieId(362).setRating(Double.valueOf(args[0])).build()); producer.send(event, ((metadata, exception) -> { if (exception != null) { exception.printStackTrace(); } })); } } }
這個測試Producer通過命令列引數的方式指定電影的分數。
8. 測試
首先我們執行下列命令構建專案:
$ ./gradlew shadowJar
然後啟動Kafka叢集,之後執行Kafka Streams應用:
$ java -Dconfig.file=configuration/dev.properties -jar build/libs/aggregating-average-standalone-0.0.1.jar
現在啟動一個終端開啟console consumer:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --group test-group --topic rating-averages --value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer
由於平均分數使用Double型別表示,因此console consumer必須指定訊息體的deserializer為DoubleDeserializer。
之後在aggregating-average路徑下開啟終端,多次執行TestProducer生成電影分數:
$ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.6 $ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 9.7 $ java -cp build/libs/aggregating-average-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer 8.6
此時,回到console consumer的終端,你應該可以看到下面的輸出:
9.6 9.65 9.3
這表明,Kafka Streams app能夠正確地實時計算電影的平均分數。