Spring data kafka操作kafka訊息的傳送和訂閱
阿新 • • 發佈:2019-02-05
本專案是在Spring Boot的基礎上構建的,筆者使用的是Spring Boot 1.5.8版本.
1.在專案的pom.xml檔案中引入如下依賴項:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>
備註:最好依賴的kafka jar和你的kafka訊息中介軟體版本保持一致,否則會有意想不到的問題出現。
2.在application.properties檔案中加入kafka所需要的配置資訊
## kafka相關配置資訊
## kafka叢集地址列表
kafka.bootstrap-servers=192.168.74.80:9092,192.168.74.81:9092
## kafka訊息主題
kafka.dataacquisition.topic=dataacquisition
3.Producer端配置
3.1.KafkaProducerConfig
@Configuration @EnableKafka public class KafkaProducerConfig { /** * The Bootstrap servers. */ @Value("${kafka.bootstrap-servers}") private String bootstrapServers; /** * The Topic. */ @Value("${kafka.dataacquisition.topic}") private String topic; /** * Gets topic. * * @return the topic */ public String getTopic() { return topic; } /** * Producer factory producer factory. * * @return the producer factory */ @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } /** * Kafka template kafka template. * * @return the kafka template */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
3.2.Producer
@Component public class Producer { /** * The constant logger. */ private static final Logger logger = LogManager.getLogger(Producer.class); /** * The Kafka template. */ @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * Send. * * @param topic the topic * @param message the message */ public void send(String topic , String message) { logger.debug("傳送訊息到kafka訊息系統, message:" + message); kafkaTemplate.send(topic, message); logger.debug("傳送訊息到kafka訊息系統結束"); } }
4.Consumer端
4.1.KafkaConsumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
/**
* The Bootstrap servers.
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* The Topic.
*/
@Value("${kafka.dataacquisition.topic}")
private String topic;
/**
* Gets topic.
*
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* Consumer factory consumer factory.
*
* @return the consumer factory
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
/**
* Kafka listener container factory concurrent kafka listener container factory.
*
* @return the concurrent kafka listener container factory
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
4.2.Consumer
@Component
public class Consumer {
/**
* The constant logger.
*/
private static final Logger logger = LogManager.getLogger(Consumer.class);
/**
* The Data handle service.
*/
@Autowired
private DataHandleService dataHandleService;
/**
* Receive.
*
* @param message the message
*/
@KafkaListener(topics = "${kafka.dataacquisition.topic}")
public void receive(String message) {
logger.info("接收到kafka訊息系統的message:" + message);
Merchandise merchandise = JsonUtils.jsonToObject(message, Merchandise.class);
dataHandleService.handle(merchandise);
}
}