1. 程式人生 > 其它 >java API操作實現kafka Stream例項(word count和sum功能)

java API操作實現kafka Stream例項(word count和sum功能)

技術標籤:kafkakafka Streamjava API

java API操作實現kafka Stream

1.啟動kafka

zkServer.sh start    //啟動zookeeper
kafka-server-start.sh -daemon /opt/server.properties //後臺啟動kafka kafka-topics.sh --zookeeper 192.168.116.60:2181 --list //檢視佇列資訊

2.建立並啟動生產者和消費者topic

建立生產者topic: mystreamin

kafka-topics.sh --create --zookeeper 192.168.116.60:2181 --topic mystreamin --partitions 1 --replication-factor 1

建立消費者topic: mystreamout

kafka-topics.sh --create --zookeeper 192.168.116.60:2181 --topic mystreamout --partitions 1 --replication-factor 1

啟動生產者topic

kafka-console-producer.sh --topic mystreamin --broker-list 192.168.116.60:9092

新開一個會話session
啟動消費者topic

kafka-console-consumer.sh --topic mystreamout --bootstrap-server 192.168
.116.60:9092 --from-beginning

3.java程式碼操作kafkaAPI

3.1 資料同步輸出

功能:將mystreamin topic中的資料寫入到mystreamout topic中

3.1.1 maven依賴

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>2.0.0</version>
    </dependency>
  </dependencies>

3.1.2 java程式碼執行

package cn.bright.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Bright
 * @Date 2020/12/15
 * @Description
 */
public class MyStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        //建立流構造器
        StreamsBuilder builder = new StreamsBuilder();

        //構建好builder 將mystreamin topic中的資料寫入到mystreamout topic中
        builder.stream("mystreamin").to("mystreamout");

        final Topology topo =builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        streams.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}

3.2 wordcount stream 例項

3.2.1 按上面的步驟新建立並執行 :

生產者topic:wordcount-input
消費者topic:wordcount-output

3.2.2 java程式碼執行

package cn.bright.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Bright
 * @Date 2020/12/15
 * @Description
 */
public class WordCountStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        //建立流構造器
        //wordcount-input
        //          hello world
        //          hello java
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> count = builder.stream("wordcount-input")      //從kafka中一條一條的取資料
                .flatMapValues((value) -> {              //返回壓扁後的資料
                            String[] split = value.toString().split(" ");  //對資料進行按空格切割,返回list集合
                            List<String> strings = Arrays.asList(split);
                            return strings;
                        }
                )     //key:null hello,null world,null hello,null java
                .map((k, v) -> {
                    return new KeyValue<String, String>(v, "1");
                }).groupByKey().count();

        count.toStream().foreach((k,v)->{
            System.out.println("key:"+k+"   value:"+v);
        });

        count.toStream().map((x,y)->{
            return new KeyValue<String,String>(x,y.toString());
        }).to("wordcount-output");



        final Topology topo =builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        streams.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}

3.3 sum stream 例項

功能:將sum-input topic中輸入的數字寫入到sum-output topic中同時進行求和

3.3.1 按上面的步驟新建立並執行 :

生產者topic:sum-input
消費者topic:sum-output

3.3.2 java程式碼執行

package cn.bright.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @Author Bright
 * @Date 2020/12/15
 * @Description
 */
public class SumStream {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sum");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
        prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        //建立流構造器
        //數字求sum
        StreamsBuilder builder = new StreamsBuilder();
        KStream<Object, Object> source = builder.stream("sum-input");
        KTable<String, String> sum1 = source.map((key, value) ->
                new KeyValue<String, String>("sum", value.toString())
        ).groupByKey().reduce((x, y) -> {
            System.out.println("x: " + x + " " + "y: " + y);
            Integer sum = Integer.valueOf(x) + Integer.valueOf(y);
            System.out.println("sum: "+sum);
            return sum.toString();
        });


        sum1.toStream().to("sum-output");

        final Topology topo =builder.build();
        final KafkaStreams streams = new KafkaStreams(topo, prop);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("stream"){

            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        streams.start();
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
}