kafka多執行緒消費及處理和手動提交處理方案設計
kafka與其他訊息佇列不同的是, kafka的消費者狀態由外部( 消費者本身或者類似於Zookeeper之類的外部儲存 )進行維護, 所以kafka的消費就更加靈活, 但是也帶來了很多的問題, 因為客戶端消費超時被判定掛掉而消費者重新分配分割槽, 導致重複消費, 或者客戶端掛掉而導致重複消費等問題.
本文內容簡介
kafka的消費者有很多種不同的用法及模型. * 本文著重探討0.9版本及之後的kafka新consumer API的手動提交和多執行緒的使用* . 對於外部儲存offset, 手動偏移設定, 以及手動分割槽分配等不同消費者方案, 將在其他文章中介紹.
消費者在單執行緒下的使用
下面介紹單執行緒情況下自動提交和手動提交的兩種消費者
1. 自動提交, 單執行緒poll, 然後消費
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", "autoCommitGroup");
//自動提交
props.put("enable.auto.commit", "true");
//自動提交時間間隔
props.put("auto.commit.interval.ms" , "1000");
//key和value的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
offset自動提交會讓人產生誤會, 其實並不是在後臺提交, 而是在poll時才會進行offset提交.
2. 手動提交, 單執行緒poll, 讀取一定量的資料後才提交offset
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", "manualOffsetControlTest");
//手動提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
//每次處理200條訊息後才提交
final int minBatchSize = 200;
//用於儲存訊息的list
ArrayList<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
//如果讀取到的訊息滿了200條, 就進行處理
if (buffer.size() >= minBatchSize) {
doSomething(buffer);
//處理完之後進行提交
consumer.commitAsync();
//清除list, 繼續接收
buffer.clear();
}
}
新kafka消費者的版本特性
在接下來的探討之前, 需要簡單介紹一下kafka消費者的特性.
- kafka的0.9版本中重寫了consumer API
- consumer維護了消費者當前消費狀態, 不是執行緒安全的
- 新的consumer基於單執行緒模型, offset自動提交在poll方法中進行, 0.9–0.10.0.1, 客戶端的心跳也是在poll中進行, 在0.10.1.0版本中, 客戶端心跳在後臺非同步傳送了
- 0.9版本不能設定每回poll返回的最大資料量, 所以poll一次會返回上一次消費位置到最新位置的資料, 或者最大的資料量. 在0.10.0.1版本及之後, 可以通過在consumer的props中設定max.poll.records來限制每回返回的最大資料條數.
我的設計
我所使用的kafka版本是0.10.0.1, 所以使用的是新版本的consumer API, 可以限制每回返回的最大資料條數, 但是心跳和自動提交都是在poll中進行的.
為了防止前面單執行緒中, 因為訊息處理時間過長, poll的時間間隔很長, 導致不能及時在poll傳送心跳, 且offset也不能提交, 客戶端被超時被判斷為掛掉, 未提交offset的訊息會被其他消費者重新消費.
我的設計:
- 首先使用max.poll.records來限制每次poll返回的最大訊息量
- 將訊息的poll和訊息的處理分隔開, 儘快的poll, 以傳送心跳
- 每個處理執行緒只負責一個分割槽的處理, 當處理到一定的數量或者距離上一次處理一定的時間間隔後, 由poll執行緒進行提交offset.
程式碼架構如下圖所示:
假設有兩個消費者執行緒MsgReceiver, 分別分到了分割槽1和分割槽2, 分割槽3和分割槽4
- 有多個消費者執行緒, 在while迴圈中poll訊息
- 消費者根據分割槽將訊息交給對應的record_processor執行緒進行處理, 即一個record_processor執行緒只處理一個分割槽的訊息
- record_processor處理執行緒處理了一定條數的訊息或者距離上一次處理訊息過去一定時間後, 將當前分割槽的偏移量放至到consumer_queue中
- 消費者record_processor在poll前先讀取commit_queue中的內容, 如果有的話, 則提交當中的偏移資訊到kafka. 然後繼續poll訊息
程式碼實現
1. 消費者任務 MsgReceiver
public class MsgReceiver implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue = new LinkedBlockingQueue<>();
private Map<String, Object> consumerConfig;
private String alarmTopic;
private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks;
private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads;
public MsgReceiver(Map<String, Object> consumerConfig, String alarmTopic,
ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks,
ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads) {
this.consumerConfig = consumerConfig;
this.alarmTopic = alarmTopic;
this.recordProcessorTasks = recordProcessorTasks;
this.recordProcessorThreads = recordProcessorThreads;
}
@Override
public void run() {
//kafka Consumer是非執行緒安全的,所以需要每個執行緒建立一個consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
consumer.subscribe(Arrays.asList(alarmTopic));
//檢查執行緒中斷標誌是否設定, 如果設定則表示外界想要停止該任務,終止該任務
try {
while (!Thread.currentThread().isInterrupted()) {
try {
//檢視該消費者是否有需要提交的偏移資訊, 使用非阻塞讀取
Map<TopicPartition, OffsetAndMetadata> toCommit = commitQueue.poll();
if (toCommit != null) {
logger.debug("commit TopicPartition offset to kafka: " + toCommit);
consumer.commitSync(toCommit);
}
//最多輪詢100ms
ConsumerRecords<String, String> records = consumer.poll(100);
if (records.count() > 0) {
logger.debug("poll records size: " + records.count());
}
for (final ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
TopicPartition topicPartition = new TopicPartition(topic, partition);
RecordProcessor processTask = recordProcessorTasks.get(topicPartition);
//如果當前分割槽還沒有開始消費, 則就沒有消費任務在map中
if (processTask == null) {
//生成新的處理任務和執行緒, 然後將其放入對應的map中進行儲存
processTask = new RecordProcessor(commitQueue);
recordProcessorTasks.put(topicPartition, processTask);
Thread thread = new Thread(processTask);
thread.setName("Thread-for " + topicPartition.toString());
logger.info("start Thread: " + thread.getName());
thread.start();
recordProcessorThreads.put(topicPartition, thread);
}
//將訊息放到
processTask.addRecordToQueue(record);
}
} catch (Exception e) {
e.printStackTrace();
logger.warn("MsgReceiver exception " + e + " ignore it");
}
}
} finally {
consumer.close();
}
}
}
2. 訊息處理任務 RecordProcessor
public class RecordProcessor implements Runnable {
private static Logger logger = LoggerFactory.getLogger(RecordProcessor.class);
//儲存MsgReceiver執行緒傳送過來的訊息
private BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>();
//用於向consumer執行緒提交消費偏移的佇列
private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue;
//上一次提交時間
private LocalDateTime lastTime = LocalDateTime.now();
//消費了20條資料, 就進行一次提交
private long commitLength = 20L;
//距離上一次提交多久, 就提交一次
private Duration commitTime = Duration.standardSeconds(2);
//當前該執行緒消費的資料條數
private int completeTask = 0;
//儲存上一條消費的資料
private ConsumerRecord<String, String> lastUncommittedRecord;
//用於儲存消費偏移量的queue, 由MsgReceiver提供
public RecordProcessor(BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue) {
this.commitQueue = commitQueue;
}
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
//有時間限制的poll, consumer傳送消費過來的佇列. 每個處理執行緒都有自己的佇列.
ConsumerRecord<String, String> record = queue.poll(100, TimeUnit.MICROSECONDS);
if (record != null) {
//處理過程
process(record);
//完成任務數加1
this.completeTask++;
//儲存上一條處理記錄
lastUncommittedRecord = record;
}
//提交偏移給consumer
commitToQueue();
}
} catch (InterruptedException e) {
//執行緒被interrupt,直接退出
logger.info(Thread.currentThread() + "is interrupted");
}
}
private void process(ConsumerRecord<String, String> record) {
System.out.println(record);
}
//將當前的消費偏移量放到queue中, 由MsgReceiver進行提交
private void commitToQueue() throws InterruptedException {
//如果沒有消費或者最後一條消費資料已經提交偏移資訊, 則不提交偏移資訊
if (lastUncommittedRecord == null) {
return;
}
//如果消費了設定的條數, 比如又消費了commitLength訊息
boolean arrivedCommitLength = this.completeTask % commitLength == 0;
//獲取當前時間, 看是否已經到了需要提交的時間
LocalDateTime currentTime = LocalDateTime.now();
boolean arrivedTime = currentTime.isAfter(lastTime.plus(commitTime));
//如果消費了設定條數, 或者到了設定時間, 那麼就傳送偏移到消費者, 由消費者非阻塞poll這個偏移資訊佇列, 進行提交
if (arrivedCommitLength || arrivedTime) {
lastTime = currentTime;
long offset = lastUncommittedRecord.offset();
int partition = lastUncommittedRecord.partition();
String topic = lastUncommittedRecord.topic();
TopicPartition topicPartition = new TopicPartition(topic, partition);
logger.debug("partition: " + topicPartition + " submit offset: " + (offset + 1L) + " to consumer task");
Map<TopicPartition, OffsetAndMetadata> map = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1L));
commitQueue.put(map);
//置空
lastUncommittedRecord = null;
}
}
//consumer執行緒向處理執行緒的佇列中新增record
public void addRecordToQueue(ConsumerRecord<String, String> record) {
try {
queue.put(record);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. 管理物件
負責啟動消費者執行緒MsgReceiver, 儲存消費者執行緒MsgReceiver, 儲存處理任務和執行緒RecordProcessor, 以及銷燬這些執行緒
public class KafkaMultiProcessorTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaMultiProcessor.class);
//訂閱的topic
private String alarmTopic;
//brokers地址
private String servers;
//消費group
private String group;
//kafka消費者配置
private Map<String, Object> consumerConfig;
private Thread[] threads;
//儲存處理任務和執行緒的map
private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks = new ConcurrentHashMap<>();
private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads = new ConcurrentHashMap<>();
public static void main(String[] args) {
KafkaMultiProcessorTest test = new KafkaMultiProcessorTest();
//....省略設定topic和group的程式碼
test.init();
}
public void init() {
consumerConfig = getConsumerConfig();
logger.debug("get kafka consumerConfig: " + consumerConfig.toString());
//建立threadsNum個執行緒用於讀取kafka訊息, 且位於同一個group中, 這個topic有12個分割槽, 最多12個consumer進行讀取
int threadsNum = 3;
logger.debug("create " + threadsNum + " threads to consume kafka warn msg");
threads = new Thread[threadsNum];
for (int i = 0; i < threadsNum; i++) {
MsgReceiver msgReceiver = new MsgReceiver(consumerConfig, alarmTopic, recordProcessorTasks, recordProcessorThreads);
Thread thread = new Thread(msgReceiver);
threads[i] = thread;
thread.setName("alarm msg consumer " + i);
}
//啟動這幾個執行緒
for (int i = 0; i < threadsNum; i++) {
threads[i].start();
}
logger.debug("finish creating" + threadsNum + " threads to consume kafka warn msg");
}
//銷燬啟動的執行緒
public void destroy() {
closeRecordProcessThreads();
closeKafkaConsumer();
}
private void closeRecordProcessThreads() {
logger.debug("start to interrupt record process threads");
for (Map.Entry<TopicPartition, Thread> entry : recordProcessorThreads.entrySet()) {
Thread thread = entry.getValue();
thread.interrupt();
}
logger.debug("finish interrupting record process threads");
}
private void closeKafkaConsumer() {
logger.debug("start to interrupt kafka consumer threads");
//使用interrupt中斷執行緒, 線上程的執行方法中已經設定了響應中斷訊號
for (int i = 0; i < threads.length; i++) {
threads[i].interrupt();
}
logger.debug("finish interrupting consumer threads");
}
//kafka consumer配置
private Map<String, Object> getConsumerConfig() {
return ImmutableMap.<String, Object>builder()
.put("bootstrap.servers", servers)
.put("group.id", group)
.put("enable.auto.commit", "false")
.put("session.timeout.ms", "30000")
.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.put("max.poll.records", 1000)
.build();
}
public void setAlarmTopic(String alarmTopic) {
this.alarmTopic = alarmTopic;
}
public void setServers(String servers) {
this.servers = servers;
}
public void setGroup(String group) {
this.group = group;
}
}
不足
上面的程式碼還有不足, 可以看到處理任務和執行緒是儲存在map中, 如果consumer因為有新機器的上線而重新分配分割槽, 而被剝奪了某個分割槽的消費, 對應的處理任務和執行緒並沒有進行響應的銷燬. 所以我們使用org.apache.kafka.clients.consumer.ConsumerRebalanceListener來對分割槽的調整進行響應.