Kafka 接收訊息 exactly once
阿新 • • 發佈:2018-12-23
– Start
下面的方法演示將 offset 儲存在資料庫中,和訊息處理放在同一事務中,真正實現 exactly once.
建立表
CREATE TABLE KAFKA_OFFSET ( TOPIC VARCHAR2(100), PARTITION NUMBER(3, 0), OFFSET NUMBER(18, 0) ); INSERT INTO KAFKA_OFFSET VALUES ('topic0', 0, 0); COMMIT; CREATE TABLE KAFKA_MESSAGE ( TOPIC VARCHAR2(100), PARTITION NUMBER(3, 0), OFFSET NUMBER(18, 0), KEY VARCHAR2(100), MESSAGE VARCHAR2(100) );
接收訊息程式碼
package shangbo.kafka.example6; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class App { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); Service service = context.getBean(Service.class); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", "ConsumerGroup3"); // 消費者組的標識 props.put("enable.auto.commit", "false"); // 收到訊息後,手動提交 offset // 建立 Consumer, 從 topic0 接收資料 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 手動設定查詢 offset List<PartitionOffset> partitionOffsets = service.queryPartitionOffset("topic0"); consumer.assign(partitionOffsets.stream().map(p -> new TopicPartition("topic0", p.getPartition())).collect(Collectors.toList())); for (PartitionOffset partitionOffset : partitionOffsets) { TopicPartition tp = new TopicPartition("topic0", partitionOffset.getPartition()); consumer.seek(tp, partitionOffset.getOffset()); } while (true) { ConsumerRecords<String, String> records = consumer.poll(100); service.process(records); } } }
package shangbo.kafka.example6; import javax.sql.DataSource; import org.apache.commons.dbcp.BasicDataSource; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; @Configuration @EnableTransactionManagement // 開啟事務管理 public class AppConfig { @Bean(destroyMethod = "close") public BasicDataSource dataSource() { BasicDataSource dataSource = new BasicDataSource(); dataSource.setDriverClassName("oracle.jdbc.driver.OracleDriver"); dataSource.setUrl("jdbc:oracle:thin:@localhost:1521:xe"); dataSource.setUsername("hr"); dataSource.setPassword("123456"); return dataSource; } @Bean public DataSourceTransactionManager txManager(DataSource dataSource) { DataSourceTransactionManager txManager = new DataSourceTransactionManager(); txManager.setDataSource(dataSource); return txManager; } @Bean public Service service(DataSource dataSource) { Service service = new ServiceImpl(); service.setDataSource(dataSource); return service; } }
package shangbo.kafka.example6;
import java.util.List;
import javax.sql.DataSource;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public interface Service {
List<PartitionOffset> queryPartitionOffset(String topic);
void process(ConsumerRecords<String, String> records);
void setDataSource(DataSource dataSource);
}
package shangbo.kafka.example6;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
public class ServiceImpl implements Service {
private JdbcTemplate jdbcTemplate;
@Transactional(readOnly = true)
public List<PartitionOffset> queryPartitionOffset(String topic) {
String sql = "select * from kafka_offset where topic = ?";
return jdbcTemplate.query(sql, new Object[] { topic }, new BeanPropertyRowMapper<PartitionOffset>(PartitionOffset.class));
}
@Transactional
public void process(ConsumerRecords<String, String> records) {
Set<TopicPartition> topicPartitions = records.partitions();
for (TopicPartition tp : topicPartitions) {
final List<ConsumerRecord<String, String>> recs = records.records(tp);
// 儲存訊息
String insertKafkaMsgSql = "insert into KAFKA_MESSAGE values (?, ?, ?, ?, ?)";
jdbcTemplate.batchUpdate(insertKafkaMsgSql, new BatchPreparedStatementSetter() {
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, recs.get(i).topic());
ps.setInt(2, recs.get(i).partition());
ps.setLong(3, recs.get(i).offset());
ps.setString(4, recs.get(i).key());
ps.setString(5, recs.get(i).value());
}
public int getBatchSize() {
return recs.size();
}
});
// 儲存 offset
Long maxOffset = recs.stream().mapToLong(ConsumerRecord::offset).max().getAsLong();
maxOffset += 1;
String updateOffsetSql = "update kafka_offset set offset = ? where topic = ? and partition = ?";
jdbcTemplate.update(updateOffsetSql, maxOffset, tp.topic(), tp.partition());
}
}
//
// Setter
//
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
}
package shangbo.kafka.example6;
public class PartitionOffset {
private Integer partition;
private Long offset;
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
}
– 更多參見:
– 聲 明:轉載請註明出處
– Last Edited on 2018-06-13
– Written by ShangBo on 2018-06-13
– End