Java實現Kafka生產者和消費者的示例
阿新 • • 發佈:2021-01-05
### Kafka簡介
Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka的目標是為處理實時資料提供一個統一、高吞吐、低延遲的平臺。
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
![](https://img2020.cnblogs.com/blog/145687/202101/145687-20210105100326000-1776867725.png)
### 方式一:kafka-clients
#### 引入依賴
在pom.xml檔案中,引入kafka-clients依賴:
```xml
org.apache.kafka
kafka-clients
2.3.1
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
#### 生產者
建立一個KafkaProducer的生產者例項:
```java
@Configuration
public class Config {
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean(destroyMethod = "close")
public KafkaProducer kafkaProducer() {
Properties props = new Properties();
//設定Kafka伺服器地址
props.put("bootstrap.servers", bootstrapServers);
//設定資料key的序列化處理類
props.put("key.serializer", StringSerializer.class.getName());
//設定資料value的序列化處理類
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer<>(props);
return producer;
}
}
```
文章持續更新,微信搜尋「萬貓學社 」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
在Controller中進行使用:
```java
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping("/kafkaClientsSend")
public String send() {
String uuid = UUID.randomUUID().toString();
RecordMetadata recordMetadata = null;
try {
//將訊息傳送到Kafka伺服器的名稱為“one-more-topic”的Topic中
recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
log.info("recordMetadata: {}", recordMetadata);
log.info("uuid: {}", uuid);
} catch (Exception e) {
log.error("send fail, uuid: {}", uuid, e);
}
return uuid;
}
}
```
文章持續更新,微信搜尋「萬貓學社 」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
#### 消費者
建立一個KafkaConsumer的消費者例項:
```java
@Configuration
public class Config {
public final static String groupId = "kafka-clients-group";
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean(destroyMethod = "close")
public KafkaConsumer kafkaConsumer() {
Properties props = new Properties();
//設定Kafka伺服器地址
props.put("bootstrap.servers", bootstrapServers);
//設定消費組
props.put("group.id", groupId);
//設定資料key的反序列化處理類
props.put("key.deserializer", StringDeserializer.class.getName());
//設定資料value的反序列化處理類
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
//訂閱名稱為“one-more-topic”的Topic的訊息
kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
return kafkaConsumer;
}
}
```
文章持續更新,微信搜尋「萬貓學社 」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
在Controller中進行使用:
```java
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaConsumer kafkaConsumer;
@RequestMapping("/receive")
public List receive() {
////從Kafka伺服器中的名稱為“one-more-topic”的Topic中消費訊息
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
List messages = new ArrayList<>(records.count());
for (ConsumerRecord record : records.records("one-more-topic")) {
String message = record.value();
log.info("message: {}", message);
messages.add(message);
}
return messages;
}
}
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
### 方式二:spring-kafka
使用kafka-clients需要我們自己建立生產者或者消費者的bean,如果我們的專案基於SpringBoot構建,那麼使用spring-kafka就方便多了。
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
#### 引入依賴
在pom.xml檔案中,引入spring-kafka依賴:
```xml
org.springframework.kafka
spring-kafka
2.3.12.RELEASE
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
#### 生產者
在application.yml檔案中增加配置:
```yml
spring:
kafka:
#Kafka伺服器地址
bootstrap-servers: 127.0.0.1:9092
producer:
#設定資料value的序列化處理類
value-serializer: org.apache.kafka.common.serialization.StringSerializer
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
在Controller中注入KafkaTemplate就可以直接使用了,程式碼如下:
```java
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaTemplate template;
@RequestMapping("/springKafkaSend")
public String send() {
String uuid = UUID.randomUUID().toString();
//將訊息傳送到Kafka伺服器的名稱為“one-more-topic”的Topic中
this.template.send("one-more-topic", uuid);
log.info("uuid: {}", uuid);
return uuid;
}
}
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
#### 消費者
在application.yml檔案中增加配置:
```yml
spring:
kafka:
#Kafka伺服器地址
bootstrap-servers: 127.0.0.1:9092
consumer:
#設定資料value的反序列化處理類
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
```
文章持續更新,微信搜尋「萬貓學社」第一時間閱讀,關注後回覆「電子書」,免費獲取12本Java必讀技術書籍。
建立一個可以被Spring框架掃描到的類,並且在方法上加上@KafkaListener註解,就可以消費訊息了,程式碼如下:
```java
@Component
@Slf4j
public class Receiver {
@KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
log.info("message: {}", message);
}
}
}
```
微信公眾號:萬貓學社
微信掃描二維碼
關注後回覆「電子書」
獲取12本Java必讀技術書籍