java API操作實現kafka Stream例項(word count和sum功能)
阿新 • • 發佈:2020-12-18
技術標籤:kafkakafka Streamjava API
java API操作實現kafka Stream
1.啟動kafka
zkServer.sh start //啟動zookeeper
kafka-server-start.sh -daemon /opt/server.properties //後臺啟動kafka
kafka-topics.sh --zookeeper 192.168.116.60:2181 --list //檢視佇列資訊
2.建立並啟動生產者和消費者topic
建立生產者topic: mystreamin
kafka-topics.sh --create --zookeeper 192.168.116.60:2181 --topic mystreamin --partitions 1 --replication-factor 1
建立消費者topic: mystreamout
kafka-topics.sh --create --zookeeper 192.168.116.60:2181 --topic mystreamout --partitions 1 --replication-factor 1
啟動生產者topic
kafka-console-producer.sh --topic mystreamin --broker-list 192.168.116.60:9092
新開一個會話session
啟動消費者topic
kafka-console-consumer.sh --topic mystreamout --bootstrap-server 192.168 .116.60:9092 --from-beginning
3.java程式碼操作kafkaAPI
3.1 資料同步輸出
功能:將mystreamin topic中的資料寫入到mystreamout topic中
3.1.1 maven依賴
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
3.1.2 java程式碼執行
package cn.bright.kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* @Author Bright
* @Date 2020/12/15
* @Description
*/
public class MyStream {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"mystream");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//建立流構造器
StreamsBuilder builder = new StreamsBuilder();
//構建好builder 將mystreamin topic中的資料寫入到mystreamout topic中
builder.stream("mystreamin").to("mystreamout");
final Topology topo =builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
streams.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
}
3.2 wordcount stream 例項
3.2.1 按上面的步驟新建立並執行 :
生產者topic:wordcount-input
消費者topic:wordcount-output
3.2.2 java程式碼執行
package cn.bright.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* @Author Bright
* @Date 2020/12/15
* @Description
*/
public class WordCountStream {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//建立流構造器
//wordcount-input
// hello world
// hello java
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Long> count = builder.stream("wordcount-input") //從kafka中一條一條的取資料
.flatMapValues((value) -> { //返回壓扁後的資料
String[] split = value.toString().split(" "); //對資料進行按空格切割,返回list集合
List<String> strings = Arrays.asList(split);
return strings;
}
) //key:null hello,null world,null hello,null java
.map((k, v) -> {
return new KeyValue<String, String>(v, "1");
}).groupByKey().count();
count.toStream().foreach((k,v)->{
System.out.println("key:"+k+" value:"+v);
});
count.toStream().map((x,y)->{
return new KeyValue<String,String>(x,y.toString());
}).to("wordcount-output");
final Topology topo =builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
streams.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
}
3.3 sum stream 例項
功能:將sum-input topic中輸入的數字寫入到sum-output topic中同時進行求和
3.3.1 按上面的步驟新建立並執行 :
生產者topic:sum-input
消費者topic:sum-output
3.3.2 java程式碼執行
package cn.bright.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* @Author Bright
* @Date 2020/12/15
* @Description
*/
public class SumStream {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"sum");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.116.60:9092");
prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
//建立流構造器
//數字求sum
StreamsBuilder builder = new StreamsBuilder();
KStream<Object, Object> source = builder.stream("sum-input");
KTable<String, String> sum1 = source.map((key, value) ->
new KeyValue<String, String>("sum", value.toString())
).groupByKey().reduce((x, y) -> {
System.out.println("x: " + x + " " + "y: " + y);
Integer sum = Integer.valueOf(x) + Integer.valueOf(y);
System.out.println("sum: "+sum);
return sum.toString();
});
sum1.toStream().to("sum-output");
final Topology topo =builder.build();
final KafkaStreams streams = new KafkaStreams(topo, prop);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("stream"){
@Override
public void run() {
streams.close();
latch.countDown();
}
});
streams.start();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}
}