1. 程式人生 > 其它 >Kafka Stream API

Kafka Stream API

技術標籤:kafka


Kafka Stream 是什麼?

大家都以為kafka只是一個訊息佇列,具有分散式,高併發,低延遲或者快速響應等多種特定,
但是kafka官方卻給自己定義自己一個開源的分散式事件流平臺.那麼啥叫流平臺? 就是自己可以做流的計算清洗和簡單的處理,多的我不想多說,咋們直接上程式碼吧

public class StreamSample {

    private static final String INPUT_TOPIC="jiangzh-stream-in";
    private static final String OUT_TOPIC=
"jiangzh-stream-out"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092"); props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app"); props.
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // 如果構建流結構拓撲 final StreamsBuilder builder = new StreamsBuilder(); // 構建Wordcount // wordcountStream(builder);
// 構建foreachStream foreachStream(builder); final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } // 如果定義流計算過程 static void foreachStream(final StreamsBuilder builder){ KStream<String,String> source = builder.stream(INPUT_TOPIC); source .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) .foreach((key,value)-> System.out.println(key + " : " + value)); } // 如果定義流計算過程 static void wordcountStream(final StreamsBuilder builder){ // 不斷從INPUT_TOPIC上獲取新資料,並且追加到流上的一個抽象物件 KStream<String,String> source = builder.stream(INPUT_TOPIC); // Hello World imooc // KTable是資料集合的抽象物件 // 運算元 final KTable<String, Long> count = source // flatMapValues -> 將一行資料拆分為多行資料 key 1 , value Hello World // flatMapValues -> 將一行資料拆分為多行資料 key 1 , value Hello key xx , value World /* key 1 , value Hello -> Hello 1 World 2 key 2 , value World key 3 , value World */ .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) // 合併 -> 按value值合併 .groupBy((key, value) -> value) // 統計出現的總數 .count(); // 將結果輸入到OUT_TOPIC中 count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long())); } }