使用kafka Streams統計單詞出現的次數
阿新 • • 發佈:2020-12-19
1.實現邏輯:
統計生產者生產的訊息,處理邏輯:統計每個單詞出現的次數,並將結果輸出到目標主題中
2.Producer端:
生產指定格式的資料:
生產資料: kafka-console-producer.sh --topic t1 --broker-list zjuserMaster:9092
dihf java pyton java scala scala java c c++ java
java java js c c java
^[[A
java js kjs =^Hj
java js java scala scala
java js java
java js java js
3.處理邏輯程式碼
`package com.zj.geek.kafka.StreamsDSL; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; /** * @author Zhou Jian * @Description kafka DSL處理例項 * @createTime 2020年12月19日 15:07:00 */ /* 統計生產者生產的訊息,處理邏輯:統計每個單詞出現的次數,並將結果輸出到目標主題中 */ public class DSL { public static void main(String[] args) { Properties props = new Properties() {{ // application.id 是 Streams 程式中非常關鍵的引數,你必須要指定一個叢集範圍內唯一的字串來標識你的 Kafka Streams 程式 put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo"); // 連線的kafka目標叢集 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "zjuserMaster:9092"); // 序列化和反序列化類 put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // 自動重置消費者位移 earliest: automatically reset the offset to the earliest offset put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); }}; // 建立StreamsBuilder物件 StreamsBuilder builder = new StreamsBuilder(); // 從主題t1中讀取訊息 KStream<String, String> source = builder.stream("t1"); // 將流轉換為表 --> KStream ~ KTable // String, Long --> 單詞,出現的次數 KTable<String, Long> counts = source /* 先對單詞進行分割,這裡我用到了 flatMapValues 方法, 程式碼中的 Lambda 表示式實現了從訊息中提取單詞的邏輯。 由於 String.split() 方法會返回多個單詞, 因此我們使用 flatMapValues 而不是 mapValues。 原因是,前者能夠將多個元素“打散”成一組單詞, 而如果使用後者,我們得到的就不是一組單詞,而是多組單詞了。 */ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) /* 呼叫 groupBy 方法對單詞進行分組。 由於是計數,相同的單詞必須被分到一起, */ .groupBy((key, value) -> value) /* 然後就是呼叫 count 方法對每個出現的單詞進行統計計數, 並儲存在名為 counts 的 KTable 物件中。 */ .count(); /* 最後,我們將統計結果寫回到 Kafka 中。由於 KTable 是表,是靜態的資料, 因此這裡要先將其轉換成 KStream, 然後再呼叫 to 方法寫入到名為 wordCount 的主題中。 此時,counts 中事件的 Key 是單詞,而 Value 是統計個數, 因此我們在呼叫 to 方法時,同時指定了 Key 和 Value 的序列化器,分別是字串序列化器和長整型序列化器。 */ counts.toStream().to("wordCount", Produced.with(Serdes.String(), Serdes.Long())); // 構造 KafkaStreams 例項並啟動它了 KafkaStreams streams = new KafkaStreams(builder.build(), props); CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); } }`
4.結果顯示
kafka-console-consumer.sh --bootstrap-server zjuserMaster:9092 \
--topic wordCount
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
dihf 1
java 4
java 7
1
java 8
kjs 1
j 1
java 10
java 12
java 14
pyton 1
scala 2