1. 程式人生 > >Kafka Stream 類庫的使用入門

Kafka Stream 類庫的使用入門

一,Kafka Stream簡介

Kafka Streams。Apache Kafka開源專案的一個組成部分。是一個功能強大,易於使用的庫。用於在Kafka上構建高可分散式、拓展性,容錯的應用程式。它建立在流處理的一系列重要功能基礎之上,比如正確區分事件事件和處理時間,處理遲到資料以及高效的應用程式狀態管理。總而言之,Kafka Stream 並不是像Hadoop spark等一樣的框架,而僅僅是一個類庫而已。

二,Kafka Stream常用API

      1,builder.stream(String topic, Consumed<K, V> consumed)            用於讀取kafka裡的資料。  

      2,builder.addStateStore()       建立容器用來儲存你的資料     3,context.getStateStore()       獲取儲存的資料資訊

    4,context().commit

       提交當前的處理進度

    5,context.getStateStore()       獲取儲存的資料資訊(儲存本地的)

三,Kafka Stream類庫的基礎知識

      1,process方法:每讀取到一條資料,這個方法都會執行一遍      2,punctuate方法:週期性的執行該方法,週期時間在init方法中呼叫schedule方法設定。 
      3,Kstream:事件流,資料的記錄的方式是追加(insert),後面的資料會追加到之前的資料裡。可以理解為你的Kafka Stream讀取topic資料時,就是儲存在這             裡的。      4,Ktable:changelog流,資料的記錄的方式是更新(update),相同的key後面的資料會覆蓋掉前面的資料。PS:由於Kstream和Ktable這兩種特性,我們可以知道Kstream是不安全的,因為一旦日誌資料出現壓縮了,之前的key值就被刪除了。這樣進入的資料方式就變成了更新。

四,程式的程式碼框架

      第一部分,拓撲(Topology)

           整個框架的程式的入口,main方法也是寫在這裡。   

     第二部分,processor

           相當於storm中的spout和bolt角色。   

     第三部分,自己的實現層

           具體指標的實現的地方。

ps:

1,下面是官方給出的wordcount案例,大家可以參考下(這個例子用了lambda表示式,JAVA8的新特性)

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
        
import java.util.Arrays;
import java.util.Properties;
        
public class WordCountApplication {
        
    public static void main(final String[] args) throws Exception {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.start();
    }
        
}

2,下面這個是以Kafka Stream的processor方式計算wordcount的程式碼

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;

  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }

  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }

  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }

  @Override
  public void close() {
    this.kvStore.close();
  }

}