Spring boot 下使用Kafka例項
阿新 • • 發佈:2019-01-30
一個簡單的例項
步驟:
1. 新增pom
2. 編寫配置檔案(生產者,消費者)
3. 測試
1. 新增pom###
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 編寫配置檔案###
生產者配置
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.25.171:9092");
// 如果請求失敗,生產者會自動重試,我們指定是0次,如果啟用重試,則會有重複訊息的可能性
props.put(ProducerConfig.RETRIES_CONFIG, 0);
/**
* Server完成 producer request 前需要確認的數量。 acks=0時,producer不會等待確認,直接新增到socket等待發送;
* acks=1時,等待leader寫到local log就行; acks=all或acks=-1時,等待isr中所有副本確認 (注意:確認都是 broker
* 接收到訊息放入記憶體就直接返回確認,不是需要等待資料寫入磁碟後才返回確認,這也是kafka快的原因)
*/
// props.put("acks", "all");
/**
* Producer可以將發往同一個Partition的資料做成一個Produce
* Request傳送請求,即Batch批處理,以減少請求次數,該值即為每次批處理的大小。
* 另外每個Request請求包含多個Batch,每個Batch對應一個Partition,且一個Request傳送的目的Broker均為這些partition的leader副本。
* 若將該值設為0,則不會進行批處理
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);//
/**
* 預設緩衝可立即傳送,即遍緩衝空間還沒有滿,但是,如果你想減少請求的數量,可以設定linger.ms大於0。
* 這將指示生產者傳送請求之前等待一段時間,希望更多的訊息填補到未滿的批中。這類似於TCP的演算法,例如上面的程式碼段,
* 可能100條訊息在一個請求傳送,因為我們設定了linger(逗留)時間為1毫秒,然後,如果我們沒有填滿緩衝區,
* 這個設定將增加1毫秒的延遲請求以等待更多的訊息。 需要注意的是,在高負載下,相近的時間一般也會組成批,即使是
* linger.ms=0。在不處於高負載的情況下,如果設定比0大,以少量的延遲代價換取更少的,更有效的請求。
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
/**
* 控制生產者可用的快取總量,如果訊息傳送速度比其傳輸到伺服器的快,將會耗盡這個快取空間。
* 當快取空間耗盡,其他傳送呼叫將被阻塞,阻塞時間的閾值通過max.block.ms設定, 之後它將丟擲一個TimeoutException。
*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
消費者配置
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import com.example.demo.mq.Consumer;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(5000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.20.25.171:9092");
/**
* 如果設定成true,偏移量由auto.commit.interval.ms控制自動提交的頻率。
*
* 如果設定成false,不需要定時的提交offset,可以自己控制offset,當訊息認為已消費過了,這個時候再去提交它們的偏移量。
* 這個很有用的,當消費的訊息結合了一些處理邏輯,這個訊息就不應該認為是已經消費的,直到它完成了整個處理。
*/
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//提交延遲毫秒數
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//執行超時時間
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//組ID
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
/**
* 在consumter端配置檔案中(或者是ConsumerConfig類引數)有個"autooffset.reset"(在kafka 0.8版本中為auto.offset.reset),
* 有2個合法的值"largest"/"smallest",預設為"largest",此配置引數表示當此groupId下的消費者,在ZK中沒有offset值時(比如新的groupId,或者是zk資料被清空),
* consumer應該從哪個offset開始消費.largest表示接受接收最大的offset(即最新訊息),smallest表示最小offset,即從topic的開始位置消費所有訊息.
*/
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
@Bean
public Consumer listener() {
return new Consumer();
}
}
註解寫的已經很清楚了
3. 測試###
生產者
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
@EnableScheduling
public class Producer {
@Autowired
private KafkaTemplate<?, String> kafkaTemplate;
@Scheduled(fixedDelay = 5000)
public void send() {
log.info("生產者 :{}",
"gaha_hero" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
kafkaTemplate.send("test-topic",
"gaha_hero" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
//send引數 1:topic,2:key,3:引數
// 預設情況下,Kafka根據傳遞訊息的key來進行分割槽的分配,即hash(key) % numPartitions,沒有指定key就隨機分配一個分割槽
}
}
消費者
import org.springframework.kafka.annotation.KafkaListener;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Consumer {
@KafkaListener(topics = {"test-topic"})
public void consumer(String message){
log.info("消費者: {}", message);
}
}