1. 程式人生 > 實用技巧 >使用kafka Streams統計單詞出現的次數

使用kafka Streams統計單詞出現的次數

1.實現邏輯:

統計生產者生產的訊息,處理邏輯:統計每個單詞出現的次數,並將結果輸出到目標主題中

2.Producer端:

生產指定格式的資料:
生產資料: kafka-console-producer.sh --topic t1 --broker-list zjuserMaster:9092
dihf java pyton java scala scala java c c++ java
java java js c c java
^[[A
java js kjs =^Hj
java js java scala scala
java js java
java js java js

3.處理邏輯程式碼

`package com.zj.geek.kafka.StreamsDSL;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Produced;

 import java.util.Arrays;
 import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;

 /**
 * @author Zhou Jian
 * @Description kafka DSL處理例項
 * @createTime 2020年12月19日 15:07:00
 */
/*
統計生產者生產的訊息,處理邏輯:統計每個單詞出現的次數,並將結果輸出到目標主題中
*/
public class DSL {
   public static void main(String[] args) {
      Properties props = new Properties() {{
        // application.id 是 Streams 程式中非常關鍵的引數,你必須要指定一個叢集範圍內唯一的字串來標識你的 Kafka Streams 程式
        put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo");
        // 連線的kafka目標叢集
        put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "zjuserMaster:9092");
        // 序列化和反序列化類
        put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // 自動重置消費者位移 earliest: automatically reset the offset to the earliest offset
        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }};
    // 建立StreamsBuilder物件
    StreamsBuilder builder = new StreamsBuilder();
    // 從主題t1中讀取訊息
    KStream<String, String> source = builder.stream("t1");
    // 將流轉換為表 --> KStream ~  KTable
    // String, Long  --> 單詞,出現的次數
    KTable<String, Long> counts = source
            /*
            先對單詞進行分割,這裡我用到了 flatMapValues 方法,
            程式碼中的 Lambda 表示式實現了從訊息中提取單詞的邏輯。
            由於 String.split() 方法會返回多個單詞,
            因此我們使用 flatMapValues 而不是 mapValues。
            原因是,前者能夠將多個元素“打散”成一組單詞,
            而如果使用後者,我們得到的就不是一組單詞,而是多組單詞了。
             */
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            /*
            呼叫 groupBy 方法對單詞進行分組。
            由於是計數,相同的單詞必須被分到一起,
             */
            .groupBy((key, value) -> value)
            /*
            然後就是呼叫 count 方法對每個出現的單詞進行統計計數,
            並儲存在名為 counts 的 KTable 物件中。
             */
            .count();
    /*
        最後,我們將統計結果寫回到 Kafka 中。由於 KTable 是表,是靜態的資料,
        因此這裡要先將其轉換成 KStream,
        然後再呼叫 to 方法寫入到名為 wordCount 的主題中。
        此時,counts 中事件的 Key 是單詞,而 Value 是統計個數,
        因此我們在呼叫 to 方法時,同時指定了 Key 和 Value 的序列化器,分別是字串序列化器和長整型序列化器。
     */
    counts.toStream().to("wordCount", Produced.with(Serdes.String(), Serdes.Long()));
    // 構造 KafkaStreams 例項並啟動它了
    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    CountDownLatch latch = new CountDownLatch(1);
    Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });


    try {
        streams.start();
        latch.await();
    } catch (final Throwable e) {
        System.exit(1);
    }
    System.exit(0);
   }
 }`

4.結果顯示

kafka-console-consumer.sh --bootstrap-server zjuserMaster:9092 \

--topic wordCount
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

dihf 1
java 4
java 7
1
java 8
kjs 1
j 1
java 10
java 12
java 14
pyton 1
scala 2