Kafka原始碼深度解析-序列9 -Consumer -SubscriptionState內部結構分析
在前面講了,KafkaConsumer的一個重要部件就是SubscriptionState,這個部件維護了Consumer的消費狀態,本篇對其內部結構進行分析。
2種訂閱策略
在第1篇講過,consumer可以自己指定要消費哪個partition,而不是讓consumer leader自動分配,對應的,也就是呼叫
KakfaConsumer::assign(List partitions)函式。
另外1種策略是呼叫subscrible,只指定要消費的topic,然後由前面所講的coordinator協議,自動分配partition。
下面的SubscriptionState的結構,就反映了這2種不同的策略:
public class SubscriptionState {
//該consumer訂閱的所有topics
private final Set<String> subscription;
//該consumer所屬的group中,所有consumer訂閱的topic。該欄位只對consumer leader有用
private final Set<String> groupSubscription;
//策略1:consumer 手動指定partition, 該欄位不為空
//策略2:consumer leader自動分配,該欄位為空
private final Set<TopicPartition> userAssignment;
//partition分配好之後,該欄位記錄每個partition的消費狀態(策略1和策略2,都需要這個欄位)
private final Map<TopicPartition, TopicPartitionState> assignment;
。。。
這裡一個關鍵點:策略1和策略2是互斥的,也就是說,如果調了assign函式,再調subscrible,會直接拋異常出來:
public void subscribe(List<String> topics, ConsumerRebalanceListener listener)
{
if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null");
if (!this.userAssignment.isEmpty() || this.subscribedPattern != null) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); //關鍵點
this.listener = listener; changeSubscription(topics);
}
2個offset
在前面我們講了,一個TopicPartition其實有2個offset,一個是當前要消費的offset(poll的時候),一個是消費確認過的offset。
因此在上面的TopicPartitionState這個結構中,有2個欄位:
//SubscriptionState中的欄位
private final Map<TopicPartition, TopicPartitionState> assignment;
//TopicPartitionState內部結構
private static class TopicPartitionState {
private Long position; //欄位1:記錄當前要消費的offset
private OffsetAndMetadata committed; //欄位2:記錄已經commit過的offset
...
}
public class OffsetAndMetadata implements Serializable {
private final long offset;
private final String metadata; //額外欄位,可以不用。比如客戶端可以記錄哪個client, 什麼時間點做的這個commit
...
}
其中欄位1是在上面Fetcher的第3步 fetchedRecords裡面進行更新的:
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
for (PartitionRecords<K, V> part : this.records) {
if (!subscriptions.isAssigned(part.partition)) {
log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
continue;
}
long position = subscriptions.position(part.partition);
if (!subscriptions.isFetchable(part.partition)) {
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
} else if (part.fetchOffset == position) {
//關鍵:計算下1個offset
long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
...
//更新SubscriptionState中的欄位1
subscriptions.position(part.partition, nextOffset);
} else {
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
part.partition, part.fetchOffset, position);
}
}
this.records.clear();
return drained;
}
}
欄位2,顯然是在手動commit或者自動commit之後,進行更新(關於這2種commit策略,前面已經講述)
總結
結合序列8,此處總結一下consumer的幾個方面的策略:
(1)assign vs. subscribe (手動指定partition vs. 自動為其分配partition)
(2)手動指定初始offset(seek) vs. 自動獲取初始offset(傳送OffsetFetchRequest請求)
(3)手動消費確認 vs. 自動消費確認(AutoCommitTask)