基於Kafka 入門小案例-官網學習
阿新 • • 發佈:2018-12-03
首先Maven引入
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>
使用純 Java 傳送和接收訊息:
public class Test{ @Test public void testAutoCommit() throws Exception { logger.info("Start auto"); ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); final CountDownLatch latch = new CountDownLatch(4); containerProps.setMessageListener(new MessageListener<Integer, String>() { @Override public void onMessage(ConsumerRecord<Integer, String> message) { logger.info("received: " + message); latch.countDown(); } }); KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps); container.setBeanName("testAuto"); container.start(); Thread.sleep(1000); // wait a bit for the container to start KafkaTemplate<Integer, String> template = createTemplate(); template.setDefaultTopic(topic1); template.sendDefault(0, "foo"); template.sendDefault(2, "bar"); template.sendDefault(0, "baz"); template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); logger.info("Stop auto"); } private KafkaMessageListenerContainer<Integer, String> createContainer( ContainerProperties containerProps) { Map<String, Object> props = consumerProps(); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container; } private KafkaTemplate<Integer, String> createTemplate() { Map<String, Object> senderProps = senderProps(); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps); KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); return template; } private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } private Map<String, Object> senderProps() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
基於Spring配置傳送和接收訊息
public class Test2{ @Autowired private Listener listener; @Autowired private KafkaTemplate<Integer, String> template; @Test public void testSimple() throws Exception { template.send("annotated1", 0, "foo"); template.flush(); assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS)); } @Configuration @EnableKafka public class Config { @Bean ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); //... return props; } @Bean public Listener listener() { return new Listener(); } @Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); //... return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { return new KafkaTemplate<Integer, String>(producerFactory()); } } } public class Listener { private final CountDownLatch latch1 = new CountDownLatch(1); @KafkaListener(id = "foo", topics = "annotated1") public void listen1(String foo) { this.latch1.countDown(); } }
基於Spring Boot的傳送和接收訊息
@SpringBootApplication public class Test3 implements CommandLineRunner { public static Logger logger = LoggerFactory.getLogger(Test3.class); public static void main(String[] args) { SpringApplication.run(Test3.class, args).close(); } @Autowired private KafkaTemplate<String, String> template; private final CountDownLatch latch = new CountDownLatch(3); @Override public void run(String... args) throws Exception { this.template.send("myTopic", "foo1"); this.template.send("myTopic", "foo2"); this.template.send("myTopic", "foo3"); latch.await(60, TimeUnit.SECONDS); logger.info("All received"); } @KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> cr) throws Exception { logger.info(cr.toString()); latch.countDown(); } }
在application.properties中加入
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest