Spring kafka 學習之二 採用java 配置類方式傳送與接收訊息
阿新 • • 發佈:2019-02-02
參考資料:https://docs.spring.io/spring-kafka/reference/html/_introduction.html#compatibility
spring-kafka 版本:2.1.5.release
1、配置類
package com.hdsxtech.kafkacustomer; import com.hdsxtech.kafkacustomer.listener.Listener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class Config { @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); // return props; props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.231:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public Listener listener() { return new Listener(); } @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); // return props; props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.231:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); } }
注:在注入bean cusmerConfig 與 producerConfig 實體的時候,修改了源文件的程式碼,由於源文件只有部分程式碼, 不知道去哪裡參考完整的程式碼, 所以我也不知道原來作者為什麼那樣寫,而且我修改之後的結構和原來的程式碼應該結構是不一樣的,但是他也工作,如果誰清楚其中的原因, 望告知, 不勝感激。
2、listener
package com.hdsxtech.kafkacustomer.listener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import java.util.concurrent.CountDownLatch; public class Listener { Logger logger = LoggerFactory.getLogger(Listener.class); public final CountDownLatch latch1 = new CountDownLatch(1); @KafkaListener(id = "foo", topics = "annotated1") public void listen1(String foo) { logger.info("foo:" + foo); this.latch1.countDown(); } }
3、測試類
package com.hdsxtech.kafkacustomer; import com.hdsxtech.kafkacustomer.listener.Listener; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.TimeUnit; import static junit.framework.TestCase.assertTrue; @RunWith(SpringRunner.class) @SpringBootTest public class TestSimple { @Autowired private Listener listener; @Autowired private KafkaTemplate<Integer, String> template; @Test public void testSimple() throws Exception { template.send("annotated1", 0, "fooaaaaaaaaaaaaaaaaaaa"); template.flush(); assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS)); } }
4、執行結果
5、通過spring boot 方式執行
package com.hdsxtech.kafkacustomer;
import jdk.management.resource.internal.inst.StaticInstrumentation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
public class KafkaCustomerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaCustomerApplication.class, args).close();
}
Logger logger = LoggerFactory.getLogger(KafkaCustomerApplication.class);
@Autowired
private KafkaTemplate<Integer, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("myTopic", "foo1");
this.template.send("myTopic", "foo2");
this.template.send("myTopic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
注:原文件程式碼有誤
private KafkaTemplate<Integer, String> template; 非
private KafkaTemplate<String, String> template;