Kafka Stream API
阿新 • • 發佈:2020-12-09
技術標籤:kafka
Kafka Stream 是什麼?
大家都以為kafka只是一個訊息佇列,具有分散式,高併發,低延遲或者快速響應等多種特定,
但是kafka官方卻給自己定義自己一個開源的分散式事件流平臺.那麼啥叫流平臺? 就是自己可以做流的計算清洗和簡單的處理,多的我不想多說,咋們直接上程式碼吧
public class StreamSample {
private static final String INPUT_TOPIC="jiangzh-stream-in";
private static final String OUT_TOPIC= "jiangzh-stream-out";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
props. put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 如果構建流結構拓撲
final StreamsBuilder builder = new StreamsBuilder();
// 構建Wordcount
// wordcountStream(builder);
// 構建foreachStream
foreachStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// 如果定義流計算過程
static void foreachStream(final StreamsBuilder builder){
KStream<String,String> source = builder.stream(INPUT_TOPIC);
source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value)-> System.out.println(key + " : " + value));
}
// 如果定義流計算過程
static void wordcountStream(final StreamsBuilder builder){
// 不斷從INPUT_TOPIC上獲取新資料,並且追加到流上的一個抽象物件
KStream<String,String> source = builder.stream(INPUT_TOPIC);
// Hello World imooc
// KTable是資料集合的抽象物件
// 運算元
final KTable<String, Long> count =
source
// flatMapValues -> 將一行資料拆分為多行資料 key 1 , value Hello World
// flatMapValues -> 將一行資料拆分為多行資料 key 1 , value Hello key xx , value World
/*
key 1 , value Hello -> Hello 1 World 2
key 2 , value World
key 3 , value World
*/
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// 合併 -> 按value值合併
.groupBy((key, value) -> value)
// 統計出現的總數
.count();
// 將結果輸入到OUT_TOPIC中
count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long()));
}
}