Spring Kafka 教程 – spring讀取和傳送kakfa訊息
Apache Kafka, 分散式訊息系統, 非常流行。Spring是非常流行的Java快速開發框架。將兩者無縫平滑結合起來可以快速實現很多功能。本檔案簡要介紹Spring Kafka,如何使用 KafkaTemplate傳送訊息到kafka的broker上, 如何使用“listener container“接收Kafka訊息。
1,Spring Kafka的組成
這一節我們首先介紹Spring Kafka的各個組成部分。
1.1 傳送訊息
與 JmsTemplate 或者JdbcTemplate類似,Spring Kafka提供了 KafkaTemplate. 該模板封裝了Kafka訊息生產者並提供各種訊息傳送方法。
訊息傳送的各種方法。
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data);
ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
1.2 接收訊息
要接收訊息,我們需要配置MessageListenerContainer並提供一個Message Listener,或者使用 @KafkaListener註解。
MessageListenserContainer
MessageListenserContainer 有以下兩個實現類:
KafkaMessageListenerContainer ConcurrentMessageListenerContainer
KafkaMessageListenerContainer可以讓我們使用單執行緒消費Kafka topic的訊息,而ConcurrentMessageListenerContainer 可以讓我們多執行緒消費訊息。
@KafkaListener 註解
Spring Kafka提供的 @KafkaListener註解,可以讓我們監聽某個topic或者topicPattern的訊息。
監聽符合topicPattern = “topic.*”的所有topic的訊息
@Component
@Slf4j
public class CmdReceiver {
@KafkaListener(topicPattern = "topic.*")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =topic:" + topic+ ", " + record);
log.info("------------------ message =topic:" + topic+ ", " + message);
}
}
}
監聽某個topic的訊息
public class Listener {
@KafkaListener(id = "id01", topics = "Topic1")
public void listen(String data) {
}
}
2, Spring Kafka 例子
下面我們介紹一個具體的例子, 這個例會發送和接收指定topic的訊息。
準備工作:
kafka_2.11-1.1.0.tgz和zookeeper-3.4.10.tar.gz
JDK jdk-8u171-linux-x64.tar.gz
IDE (Eclipse or IntelliJ)
Build tool (Maven or Gradle)
本文不涉及安裝Kafka的介紹,請自行搜尋,或者看官方文件。
pom檔案:
也就是我們的依賴包. 這是筆者使用的依賴版本,僅供參考。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yq</groupId>
<artifactId>kafkademo</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-spring-web</artifactId>
<version>2.7.0</version>
</dependency>
<!-- fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.33</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
*KafkaDemoApplication*
我們使用springboot的框架,這是我們程式的入口點。
@SpringBootApplication
public class KafkaDemoApplication {
private static final Logger logger = LoggerFactory.getLogger(KafkaDemoApplication.class);
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);
logger.info("Done start Spring boot");
}
}
ProducerConfig
其實我們可以可以不用編寫KafkaProducerConfig,直接使用KafkaTemplate(當然前提是我們要設定好producer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.producer.key-serializer, spring.kafka.producer.retries等等)
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092(根據實際情況修改)");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
KafkaConsumerConfig
同理,其實我們可以可以不用編寫KafkaConsumerConfig,直接使用 @KafkaListener(當然前提是我們要設定好consumer需要的配置項,例如spring.kafka.bootstrap-servers, spring.kafka.consumer.key-deserializer, spring.kafka.consumer.group-id、spring.kafka.consumer.auto-offset-reset等等)
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092(根據實際情況修改)");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
}
@Bean
public MyListener listener() {
return new MyListener();
}
}
定義了ProducerConfig和ConsumerConfig後我們需要實現具體的生產者和消費者。
本文的KafkaListenerContainerFactory 中使用了ConcurrentKafkaListenerContainer, 我們將使用多執行緒消費訊息。
注意訊息代理的地址是localhost:9092, 需要根據實際情況修改。需要特別注意的是,我在windows執行程式,kafka在我的linux虛擬機器, 我需要配置windows的hosts檔案,配置虛擬機器hostname和ip的對映,例如192.168.119.131 ubuntu01
開發Listener
我們來開發自己的Listener監聽具體的topic, 這裡例子中我們監聽以topic開頭的主題,不做其他業務,只是打印出來。
@Component
@Slf4j
public class MyListener{
@KafkaListener(topicPattern = "topic.*")
public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("------------------ message =topic:" + topic+ ", " + message);
}
}
}
開發producer
我在程式中增加了controller,這樣我們可以通過controller給topic發訊息。consumer一直在監聽,只要有訊息傳送過去,就會打印出來。controller中呼叫了ProducerServiceImpl , 具體程式碼比較簡單就不再羅列。
我們producerServiceImpl主要是有這句, 通過KafkaTemplate 傳送訊息。
@Autowired
private KafkaTemplate template;
@Service
public class ProducerServiceImpl implements ProducerService {
private static final Logger logger = LoggerFactory.getLogger(ProducerServiceImpl.class);
private Gson gson = new GsonBuilder().create();
@Autowired
private KafkaTemplate template;
//傳送訊息方法
public void sendJson(String topic, String json) {
JSONObject jsonObj = JSON.parseObject(json);
jsonObj.put("topic", topic);
jsonObj.put("ts", System.currentTimeMillis() + "");
logger.info("json+++++++++++++++++++++ message = {}", jsonObj.toJSONString());
ListenableFuture<SendResult<String, String>> future = template.send(topic, jsonObj.toJSONString());
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("msg OK." + result.toString());
}
@Override
public void onFailure(Throwable ex) {
System.out.println("msg send failed: ");
}
});
}
執行程式
執行第一步,確保Kafka broker配置正確,筆者的程式在Windows10機器上,Kafka在虛擬機器上,因為我的地址是192.168.119.129:9092, 而不是localhost:9092.
執行第二步驟,在IDEA中選中KafkaDemoApplication , 單擊滑鼠右鍵,選擇 Run KafkaDemoApplication
效果圖
kafka段命令列接收到的訊息
3,總結
Spring Kafka提供了很好的整合,我們只需配置properties檔案,就可以直接使用KafkaTemplate傳送訊息,使用@KafkaListener監聽訊息。