Kafka Stream 類庫的使用入門
阿新 • • 發佈:2019-01-25
一,Kafka Stream簡介
Kafka Streams。Apache Kafka開源專案的一個組成部分。是一個功能強大,易於使用的庫。用於在Kafka上構建高可分散式、拓展性,容錯的應用程式。它建立在流處理的一系列重要功能基礎之上,比如正確區分事件事件和處理時間,處理遲到資料以及高效的應用程式狀態管理。總而言之,Kafka Stream 並不是像Hadoop spark等一樣的框架,而僅僅是一個類庫而已。
二,Kafka Stream常用API
1,builder.stream(String topic, Consumed<K, V> consumed) 用於讀取kafka裡的資料。2,builder.addStateStore() 建立容器用來儲存你的資料 3,context.getStateStore() 獲取儲存的資料資訊
4,context().commit
提交當前的處理進度
5,context.getStateStore() 獲取儲存的資料資訊(儲存本地的)三,Kafka Stream類庫的基礎知識
1,process方法:每讀取到一條資料,這個方法都會執行一遍 2,punctuate方法:週期性的執行該方法,週期時間在init方法中呼叫schedule方法設定。四,程式的程式碼框架
第一部分,拓撲(Topology)整個框架的程式的入口,main方法也是寫在這裡。
相當於storm中的spout和bolt角色。
第三部分,自己的實現層具體指標的實現的地方。
ps:
1,下面是官方給出的wordcount案例,大家可以參考下(這個例子用了lambda表示式,JAVA8的新特性)
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
2,下面這個是以Kafka Stream的processor方式計算wordcount的程式碼
public class WordCountProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
}
@Override
public void process(String key, String value) {
Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
int count = counts.map(wordcount -> wordcount + 1).orElse(1);
kvStore.put(word, count);
});
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iterator = this.kvStore.all();
iterator.forEachRemaining(entry -> {
context.forward(entry.key, entry.value);
this.kvStore.delete(entry.key);
});
context.commit();
}
@Override
public void close() {
this.kvStore.close();
}
}