Springboot2整合kafka的兩種使用方式
阿新 • • 發佈:2020-10-28
Springboot2整合kafka
kafka是一個分散式訊息佇列。在專案中應用十分廣泛,具有高效能、持久化、多副本備份、橫向擴充套件能力。
kafka
- 在多臺機器上分別部署Kafka,即Kafka叢集。每臺機器執行的Kafka服務稱為broker。
- 一個Topic主題可以被分為若干個分割槽(partition),每個分割槽在儲存層面是append log檔案。
- 分割槽(Partition )為Kafka提供了可伸縮性,水平擴充套件功能。
- 多副本機制(Partition Replica)提高了kafka的資料可靠性和容災能力。
圖片來源《深入理解kafka核心設計和實踐原理》
docker上安裝環境
1.安裝zookeeper 和 安裝kafka
- 這裡使用了wurstmeister/kafka和wurstmeister/zookeeper這兩個版本的映象
2.執行映象
- 整個啟動過程遇到了8個左右報錯,一個個解決,最後執行成功,簡單列幾個
- Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME
- WARN Session 0x0 for server zookeeper:2181, unexpected error, closing socket
- java.nio.channels.UnresolvedAddressException
- could not be established. Broker may not be available
- Give up sending metadata request since no node is available
- 總結下最後的啟動命令,依此啟動zookeeper和kafka
docker run --name zk01 -p 2181:2181 --restart always -d zookeeper
- 1
docker run --name kafka01 -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ADVERTISED_HOST_NAME=localhost -e KAFKA_ZOOKEEPER_CONNECT="192.168.0.111:2181" -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.111:9092 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092 --link zk01:zk -t wurstmeister/kafka
- 1
Springboot2引入kafka
<!--引入Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 1
- 2
- 3
- 4
- 5
- application.properties配置
#kafka配置
spring.kafka.bootstrap-servers=192.168.0.111:9092
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量傳送訊息的數量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定訊息key和訊息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定預設消費者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定訊息key和訊息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
基於註解
生產
@Component
public class KafkaProducer {
private static final String MY_TOPIC = "TOPIC_LIN_LIANG";
@Autowired
KafkaTemplate kafkaTemplate;
public void produce(){
Message message = new Message();
message.setId(12L);
message.setMsg("hello jack");
message.setTime(new Date());
kafkaTemplate.send(MY_TOPIC,message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
消費
@KafkaListener可以接受的引數有:
- data : 對於data值的型別其實並沒有限定,根據KafkaTemplate所定義的型別來決定。 data為List集合的則是用作批量消費。
- ConsumerRecord:具體消費資料類,包含Headers資訊、分割槽資訊、時間戳等
- Acknowledgment:用作Ack機制的介面
- Consumer:消費者類,使用該類我們可以手動提交偏移量、控制消費速率等功能
public void listen1(String data)
public void listen2(ConsumerRecord<K,V> data)
public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
public void listen5(List<String> data)
public void listen6(List<ConsumerRecord<K,V>> data)
public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
使用示例
@KafkaListener(topics = {MY_TOPIC})
public void consume(String message){
log.info("receive msg "+ message);
}
- 1
- 2
- 3
- 4
基於客戶端
0.9x版本後的kafka客戶端使用java語言編寫,本人更傾向於這種開發方式。
在配置中註釋了基本意思,具體參考了朱忠華的《深入理解kafka:核心設計和實現原理》,學kafka感覺這一本就夠了。
/**
* linliang
*/
@Configuration
public class Kafka_Config implements InitializingBean {
@Value("${kafka.broker.list}")
public String brokerList;
public static final String topic = "TOPIC_LIN_LIANG";
public final String groupId = "group.01";
public Properties customerConfigs() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動位移提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);//自動位移提交間隔時間
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);//消費組失效超時時間
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//位移丟失和位移越界後的恢復起始位置
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
return props;
}
public Properties producerConfigs() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 訊息快取
//生產者空間不足時,send()被阻塞的時間,預設60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
//生產者重試次數
props.put(ProducerConfig.RETRIES_CONFIG, 0);
//指定ProducerBatch(訊息累加器中BufferPool中的)可複用大小
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//生產者會在ProducerBatch被填滿或者等待超過LINGER_MS_CONFIG時傳送
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
@Bean
public Producer<Integer, Object> getKafkaProducer() {
//KafkaProducer是執行緒安全的,可以在多個執行緒中共享單個例項
return new KafkaProducer<Integer, Object>(producerConfigs());
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
生產
@Component
public class Kafka_Producer {
public String topic = Kafka_Config.topic;
@Autowired
Producer producer;
public void producer() throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + ":" + metadata.offset());
}
}
});
} catch (Exception e) {
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
消費
@Component
public class Kafka_Consumer implements InitializingBean {
public String topic = Kafka_Config.topic;
@Autowired
Kafka_Config kafka_config;
@Override
public void afterPropertiesSet() throws Exception {
//每個執行緒一個KafkaConsumer例項,且執行緒數設定成分割槽數,最大化提高消費能力
int consumerThreadNum = 2;//執行緒數設定成分割槽數,最大化提高消費能力
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(kafka_config.customerConfigs(), topic).start();
}
}
public class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("message------------ "+record.value());
}
}
} catch (Exception e) {
} finally {
kafkaConsumer.close();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
歡迎關注公眾號fbzl95