1. 程式人生 > >Kafka原始碼之KafkaConsumer分析之offset操作

Kafka原始碼之KafkaConsumer分析之offset操作

當消費者正常消費過程中以及Rebalance操作開始之前,都會提交一次Offset記錄Consumer當前的消費位置。在SubscriptionState中使用TopicPartitionState記錄了每個TopicPartition的消費狀況,TopicPartitionState.position欄位則記錄了消費者下次要從服務端獲取的訊息的offset。

public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
		//將needsFetchCommittedOffsets設定為true
        this.subscriptions.needRefreshCommits();
        //建立OffsetCommitRequest並新增到unsent佇列中
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
        //選擇回撥函式
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
        //新增監聽器
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);
                cb.onComplete(offsets, null);
            }

            @Override
            public void onFailure(RuntimeException e) {
                if (e instanceof RetriableException) {
                    cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
                } else {
                    cb.onComplete(offsets, e);
                }
            }
        });

        //以非中斷的方式發出去
        client.pollNoWakeup();
    }

這個是以非同步的方式提交offsetcommitRequest,而同步的方式和它由兩點不同,一是同步的方式通過阻塞poll方法獲取響應,二是如果檢測到RetriableException時會進行重試。 AutoCommitTask是一個定時任務,它週期性地呼叫commitOffsetsAsync方法,實現自動提交offset功能。 接下來我們看一下對OffsetCommitResponse的處理流程:

public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            sensors.commitLatency.record(response.requestLatencyMs());
            Set<String> unauthorizedTopics = new HashSet<>();
			//遍歷待提交的所有offset資訊
            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();
				//獲取錯誤碼
                Errors error = Errors.forCode(entry.getValue());
                if (error == Errors.NONE) {
                    log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
                    if (subscriptions.isAssigned(tp))
                        // update the local cache only if the partition is still assigned
                        subscriptions.committed(tp, offsetAndMetadata);
                } 
                //錯誤碼的處理
               ...

            if (!unauthorizedTopics.isEmpty()) {
                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }

在Rebalance操作結束之後,每個消費者都缺定了其需要消費的分割槽。在開始消費之前,消費者需要確定拉去訊息的其實位置。假設之前已經將最後的消費位置提交到了GroupCoordinator,GroupCoordinator將其儲存到了Kafka內部的Offsets Topic中,此時消費者可以通過OffsetFetchRequest請求獲取上次提交offset並從次開始消費

public void refreshCommittedOffsetsIfNeeded() {
		//檢查needsFetchCommittedOffsets
        if (subscriptions.refreshCommitsNeeded()) {
        	//傳送OffsetFetchRequest並處理OffsetFetchResponse響應,返回值時最近提交的offset集合
            Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
            //遍歷每個分割槽,更新committed欄位
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                // verify assignment is still active
                if (subscriptions.isAssigned(tp))
                    this.subscriptions.committed(tp, entry.getValue());
            }
            //將needsFetchCommittedOffsets設定為false
            this.subscriptions.commitsRefreshed();
        }
    }

這個方法的主要功能時傳送OffsetFetchRequest請求從服務端拉去最近提交的offset集合,並更新到Subscriptions集合中。

public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
        while (true) {
        	//確保和GroupCoordinator處於連線狀態
            ensureCoordinatorReady();

            // 建立並快取請求
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
            //阻塞等待
            client.poll(future);
			//如果成功返回結果
            if (future.succeeded())
                return future.value();

            if (!future.isRetriable())
                throw future.exception();
			//如果時Retriable異常,等待一段時間後重試
            time.sleep(retryBackoffMs);
        }
    }

下面看一下OffsetFetchResponse的處理方法:

public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
            //遍歷從服務端獲取到的offset集合
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData data = entry.getValue();
                if (data.hasError()) {
                    Errors error = Errors.forCode(data.errorCode);
                    log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());
                    return;
                } else if (data.offset >= 0) {
                    // 記錄正常的資料
                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                } else {
                    log.debug("Group {} has no committed offset for partition {}", groupId, tp);
                }
            }
			//將儲存的資料傳播下去
            future.complete(offsets);
        }