Fetcher: KafkaConsumer訊息消費的管理者
我們在客戶端使用KafkaConsumer類進行Kafka訊息的消費,其實KafkaConsumer是將建立消費請求、接收響應的操作全部交給了Fetcher去處理。我們從KafkaConsumer.poll()
方法進入,解析Fetcher的工作流程。
在我們看具體實現以前,不妨來了解一下每一次呼叫KafkaConsumer.poll(long timeout)
方法消費訊息的時候,Kafka的基本工作流程:
poll(timeout){
根據poll(timeout)引數,估算剩餘時間
while(還有剩餘時間)
從Fetcher端拉取消費到的訊息
if (訊息數量不為空)
建立傳送請求
立刻將請求傳送
else
return
end //if ends
計算剩餘時間
end //while ends
}
從上述虛擬碼可以看到,在超時時間到達之前,KafkaConsumer會反覆通過呼叫KafkaConsumer.poll()
進行訊息的拉取,其實這次訊息的獲取是上一次請求的返回資料,同時,每一次poll請求,KafkaConsumer都會順便再一次傳送請求以便下一次poll操作能夠直接獲取返回結果。
看到這裡,肯定有人會問,每次poll完成以後都再一次傳送請求,那是否會讓每一次poll()
ConsumerNetworkClient.send()
方法看出,讀者可自行閱讀程式碼。
public ConsumerRecords<K, V> poll(long timeout) {
acquire();//確保只有一個唯一執行緒呼叫poll方法
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative" );
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
//進行一次消費操作
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
fetcher.sendFetches();//在請求到資料以後,順便傳送下一次請求,由於請求是非同步傳送,因此並不會影響本次訊息消費的效率
client.pollNoWakeup();//傳送一個poll請求,並且是立刻返回的,因為timeout=0
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;//計算剩餘可用的時間
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
在保證超時時間沒有到達的前提下,通過呼叫pollOnce()
來進行一次訊息的拉取,其實是呼叫一次Fetcher.fetchedRecords()
方法取出已經收到的Kafka訊息:
/**
* 進行一次消費操作,如果這次操作直接在fetcher已經存在,則直接返回這些已經完成的結果,而如果fetcher沒有返回任何結果,則會強行進行一次poll操作。
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
// TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
//確認服務端的GroupCoordinator已經獲取並且已經能夠接受請求
coordinator.ensureCoordinatorReady();
// ensure we have partitions assigned if we expect to
//確認已經完成了分割槽分配
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
long now = time.milliseconds();
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
//執行heartbeat任務或者自動提交offset任務
client.executeDelayedTasks(now);
// init any new fetches (won't resend pending fetches)
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();//直接獲取已經收到的資料
// if data is available already, e.g. from a previous network client poll() call to commit,
// then just return it immediately
if (!records.isEmpty())
return records;
//如果沒有接收到任何一條訊息,則真正地傳送fetch請求
fetcher.sendFetches();
client.poll(timeout, now);
return fetcher.fetchedRecords();
}
pollOnce()
的基本執行邏輯,就是首先確保遠端的GroupCoordinator是正常並且已經連線的狀態。在這裡我需要解釋一下Kafka的兩種型別的Coordinator:
- ConsumerCoordinator:客戶端角色,每一個客戶端的Consumer都會有一個ConsumerCoordinator與之對應,ConsumerCoordinator負責代理這個Consumer與遠端的GroupCoordinator進行溝通,比如joinGroup、針對自己在group中的leader或者follower身份進行不同的操作(必須被選舉為leader的ConsumerCoordinator會獲取整個group的消費者的訂閱情況然後進行分割槽分派,follower身份的ConsumerCoordinator只需要被動接受分派給自己的分割槽)
- GroupCoordinator:服務端角色,每一個Group的管理者,用來管理這個Group中所有的ConsumerCoordinator,比如leader的選舉。注意,我們必須把group leader選舉和分割槽分派區別開。一個ConsumerCoordinator在進行joinGroup操作的時候,GroupCoordinator會從所有的ConsumerCoordinator選舉出來一個Leader,然後Leader進行分割槽分派,即為Group中的所有ConsumerCoordinator分派分割槽。即分割槽分派其實是在客戶端進行,而不是服務端進行。
在pollOnce()
開始時,首先需要確認消費訊息以前的所有準備工作已經做完,包括:
- 已經確認遠端的GroupCoordinator:在初始化狀態下,一個Consumer並不清楚自己所在的Group對應的GroupCoordinator會在哪臺Kafka Server上,因此會選擇一個Kafka Server,傳送請求獲取GroupCoordinator
- 已經完成joinGroup操作:在獲取了GroupCoordinator的身份以後,會進行joinGroup操作。GroupCoordinator會從所有的ConsumerCoordinator中選舉一個作為這個group的leader,剩餘的作為follower。因此需要確認自己已經成功進行了joinGroup操作。
- 已經完成了分割槽分派:在joinGroup操作中被選舉為leader的ConsumerCoordinator會負責進行分割槽分派,即將group中所有topic的每個分割槽分派給對應的ConsumerCoordinator進行消費,因此需要確認。
通過coordinator.ensureCoordinatorReady();
確認GroupCoordinator的身份已經明確並且可以接收請求。如果發現GroupCoordinator還沒有準備好,則該方法會一直block直到其處於ready的狀態:
/**
* Block until the coordinator for this group is known and is ready to receive requests.
* 等待直到我們和服務端的GroupCoordinator取得連線
*/
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {//無法獲取GroupCoordinator
RequestFuture<Void> future = sendGroupCoordinatorRequest();//傳送請求
client.poll(future);//同步等待非同步呼叫的結果
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);//等待一段時間,然後重試
}
}
}
同時,通過 coordinator.ensurePartitionAssignment();
確認已經成功加入了group並且分派給自己的分割槽都是正常的。
當確認了自己與GroupCoordinator的所有狀態都正常,在正式獲取資料之前,還會對已經到達執行時間的定時任務執行。這種定時任務主要包括兩種:
- 心跳任務:心跳任務(HeartbeatTask)用來告知GroupCoordinator自己還活著。如果GroupCoordinator長期沒有收到心跳,將會認為這個ConsumerCordinator已經退出,從而通過rebalance來將ConsumerCoordinator從group中移除。
- offset提交任務:offset提交任務(AutoCommitTask)是當用戶設定了consumer的offset提交模式為自動提交以後,用來告知遠端的ConsumerCoordinator自己已經消費到的訊息位置。每次提交,都會執行
AutoCommitTask.run()
方法,同時,AutoCommitTask.run()
中,會呼叫AutoCommitTask.reschedule()
再次提交一個任務,從而實現這個定時任務的不斷提交,即offset的不斷提交。
注意,這兩種定時任務在Kafka上叫做delayedTask
,即可以 容忍適當延遲 的任務。客戶端每次執行poll操作,都會檢查這些延遲任務的執行時間是否已經到了,如果到了就執行。同時,我們看到,遠端的GroupCoordinator是通過心跳來判斷ConsumerCoordinator的心跳來判斷ConsumerCoordinator是否還活著,而心跳資訊只有在poll()被呼叫的時候發出,因此,如果我們在兩次相鄰地poll之間的時間超過閾值,GroupCoordinator會認為ConsumerCoordinator已經消失並進行rebalance操作。咋大多數情況下,無論Kafka的程式碼多麼的健壯,一次rebalce都會是一次不穩定因素,是應該竭力避免的行為。因此,我們應該通過合理設定一下兩個引數,來竭力避免兩次poll相鄰時間過長導致的rebalance:
max.poll.records
:合理設定每次poll的訊息消費數量,如果數量過多,導致一次poll操作返回的訊息記錄無法在指定時間內完成,則會出發rebalance;max.poll.interval.ms
:盡力保證一次poll的訊息能夠很快完成,無論我們的業務程式碼在拿到poll()
的結果之後做了什麼操作,比如需要存入hdfs、需要存入hive、關係型資料庫,都需要對消耗的時間進行預估,保證時間不會太長;
在執行完了中的延遲任務以後,開始呼叫fetcher.fetchedRecords();
獲取資料。上面已經說過,這次獲取的資料是上一次poll發出的請求所返回的資料,因此是直接從記憶體中獲取的已有資料:
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {//是否需要重新進行分割槽分配
return Collections.emptyMap();//返回空結果
} else {
//儲存返回結果,key為TopicPartition,value為這個TopicPartition的所有消費到到資料
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
//從方法sendFetches可以看到,每一個CompletedFetch的一條資料,是某個TopicPartition的一批資料
Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();//遍歷已經返回的結果
while (recordsRemaining > 0) {//計算剩餘可以poll的訊息量
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {//第一次進入迴圈
if (!completedFetchesIterator.hasNext())
break;
CompletedFetch completion = completedFetchesIterator.next();
completedFetchesIterator.remove();
//將位元組訊息轉換成ConsumerRecord物件
nextInLineRecords = parseFetchedData(completion);
} else {
//將資料從nextInLineRecords中取出,放入到drained中,並且清空nextInLineRecords,更新offset
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
}
return drained;
}
}
fetchedRecords()
方法中,通過不停地迭代遍歷儲存了已完成的消費請求所返回到資料的List<CompletedFetch> completedFetches
,從中取出CompletedFetch,但是由於CompletedFetch中儲存是返回的原始位元組碼資料,因此會將位元組碼翻譯為資料物件,依照資料的TopicPartition,存入到Map<TopicPartition, List<ConsumerRecord<K, V>>> drained
中。當訊息數量已經不小於使用者配置的最大消費訊息數量,活著當前completedFetches已經沒有了資料,則迴圈退出,返回資料。其中比較重要的方法是private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,PartitionRecords<K, V> partitionRecords, int maxRecords)
方法:
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
PartitionRecords<K, V> partitionRecords,
int maxRecords) {
if (partitionRecords.isEmpty())
return 0;
if (!subscriptions.isAssigned(partitionRecords.partition)) {//判斷是否是分配給自己的分割槽
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {//是自己的分割槽
// note that the consumed position should always be available as long as the partition is still assigned
long position = subscriptions.position(partitionRecords.partition);//當前的分割槽消費位置
//當且僅當1.這個分割槽的確是分派給這個consumer 2當前不是pause狀態 3.當前存在合法的分割槽位置,這個分割槽才會是fetchable
if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.fetchOffset == position) {//分割槽位置校驗通過
// we are ensured to have at least one record since we already checked for emptiness
List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;//下一個offset是當前收到的最後一條訊息的offset+1
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
//將這一批資料儲存到map中
List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
if (records == null) {
records = partRecords;
drained.put(partitionRecords.partition, records);
} else {
records.addAll(partRecords);
}
//更新offset
subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords.size();
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.fetchOffset, position);
}
}
partitionRecords.discard();
return 0;
}
這個方法等職責比較關鍵,核心任務是把返回的一批資料按照TopicPartition
歸類,存入Map<TopicPartition, List<ConsumerRecord<K, V>>> drained
作為最終返回資料,同時,還進行了資料校驗:
- 對於每條資料,校驗資料所在的分割槽是不是分派給自己的分割槽,因為所有Consumer只有權利消費自己訂閱的並且在分割槽分派時的確分派給了自己的分割槽;
- 判斷這個分割槽處於fetchable狀態,判斷標準是:
- 這個分割槽的確是分派給這個consumer;
- 當前不是pause狀態,pause的發生是顯式呼叫
KafkaConsuer.pause()
方法,用來暫停消費; - 當前存在合法的分割槽位置,所謂合法,即Consumer端記錄的上次的消費位置是存在的,而不是空的;
- 分割槽位置嚴格校驗:Kafka客戶端本地儲存了上一次消費的最後一條訊息的下一個offset值,因此,在正常情況下,本次請求的一批記錄的第一條的offset值,必須等於該值,如果不等於,則忽略資料。
當所有校驗通過,則將資料儲存在drained中作為最終返回結果,同時,通過subscriptions.position(partitionRecords.partition, nextOffset);
更新本地儲存的該TopicPartition對應的分割槽位置為nextOffset:
從上述程式碼:long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
,nextoffset
是下一條訊息的offset值。
在上文中,我們從KafkaConsumer.poll(timeout)
方法為入口,分析了消費者如何通過Fetcher進行訊息消費的。我們說過,每次訊息消費,都是上一次請求對應的返回結果,是從記憶體中直接獲取的請求。因此,現在我們來看看每一次的消費請求是如何發出的。
其實,從poll(timeout)
的程式碼可以看到,每次消費完資料,都會通過Fetcher.sendFetches()
順帶傳送下一次的消費請求:
public void sendFetches() {
//呼叫createFetchRequests建立傳送請求,然後逐個請求傳送到遠端broker
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();//request是對某個節點上的某個TopicPartition的請求資料
//ConsumerNetworkClient.send會將請求放到unsend中
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
//獲取這一批響應資料中的所有的TopicPartition
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
//對響應資料進行遍歷
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;//請求傳送的時候這個TopicPartition的offset
FetchResponse.PartitionData fetchData = entry.getValue();//fetchData中存放了這個TopicPartition所返回的資料
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
} sensors.fetchLatency.record(resp.requestLatencyMs()); sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch failed", e);
}
});
}
}
sendFetches()
方法通過createFetchRequests()
來建立請求,然後,將請求通過ConsumerNetworkClient.send()
逐漸傳送出去。ApiKeys.FETCH
代表了請求型別為資料請求,即消費請求,除了資料消費請求,還有各種其它請求,都是通過ConsumerNetworkClient.send()
傳送到遠端的,比如:
ApiKeys.PRODUCE 生產訊息的請求
ApiKeys.METADATA:獲取伺服器元資料的請求
ApiKeys.JOIN_GROUP:加入到group的請求
ApiKeys.LEAVE_GROUP:離開group請求
ApiKeys.SYNC_GROUP:同步group資訊的請求
ApiKeys.HEARTBEAT:心跳請求
ApiKeys.OFFSET_COMMIT:提交offset的請求
ApiKeys.OFFSET_FETCH:獲取遠端offset的請求
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
是通過非同步回撥的方式來處理返回結果,通過定義一個實現了RequestFutureListener的匿名實現類,實現了收到相應成功或者失敗以後的回撥:
.addListener()
public interface RequestFutureListener<T> {
void onSuccess(T value);
void onFailure(RuntimeException e);
}
當成功收到相應,會將訊息經過處理放入到List<CompletedFetch> completedFetches
中。上文已經說過,Fetcher.fetchedRecords
就是從completedFetches
獲取訊息的。
同時,我們一起來看看Fetcher是如何建立資料消費請求的:
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
* 建立fetch請求,這個請求的key是node,value是一個FetchRequest物件,這個物件封裝了對這個節點上的一個或者多個TopicPartition的資料獲取請求
*/
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
//fetchable的key是節點,value是在這個節點上所有TopicPartition的請求資訊
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
for (TopicPartition partition : fetchablePartitions()) {//對於每一個partition
Node node = cluster.leaderFor(partition);//檢視這個partition的leader節點
if (node == null) {
metadata.requestUpdate();//node是空,則重新更新元資料
} else if (this.client.pendingRequestCount(node) == 0) {//如果這個節點上的pending請求為0,pending既包括in-flight,也包括unsent
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetchable.put(node, fetch);
}
long position = this.subscriptions.position(partition);
//將當前的offset資訊、請求資料的大小放入request中
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));//將每個partition的請求儲存
log.trace("Added fetch request for partition {} at offset {}", partition, position);
}
}
// create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
}
createFetchRequests()
的執行虛擬碼:
獲取叢集元資料
獲取所有的fetchablePartitions
for(每一個fetchablePartition){
獲取這個partition的leader node
if(無法獲取lead node資訊)
傳送元資料更新請求
else
{
建立對這個節點的資料獲取請求,儲存在一個Map中
}
}
請求建立完畢,儲存在Map中,返回這個Map
createFetchRequests會獲取所謂fetchablePartitions,那麼,究竟哪些TopicPartition被認為是fetchable的呢?
我們一起來看 :
private Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
//從fetchedRecords()方法中可以看到,nextInLineRecords代表正在進行處理的返回結果
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
//completedFetches代表已經取回的等待消費的資料
for (CompletedFetch completedFetch : completedFetches)
fetchable.remove(completedFetch.partition);
return fetchable;
}
- 首先,當然,這個TopicPartition必須是分派給自己的TopicPartition
- 這個TopicPartition不是處於paused狀態
- 這個TopicPartition有合法的position,即對於這個TopicPartition有合法的消費位置offset的記錄
- 這個TopicPartition在Fetcher物件裡面不存在已經取回但是還沒被消費的資料
以上就是KafkaConsumer委託Fetcher建立消費請求、獲取消費資料的基本流程,其實涉及到比較多的東西,包括通過ConsumerCoordinator代理自己與遠端的GroupCoordinator進行溝通,進入和離開Group,分割槽的分派,通過ConsumerNetworkClient負責底層的網路通訊,通過SubscriptionState物件維護本地的TopicPartition的資訊,獲取到訊息以後的校驗,通過定時任務進行自動offset提交,通過定時任務進行心跳以報告活性等等。有興趣的讀者可以自行詳細閱讀程式碼。我將會有更多的部落格來對本過程涉及到的其他方面進行專門的介紹。
雖然Kafka的核心程式碼在Server端,但是從Consumer或者Producer端進入,基本上可以看到整個訊息通訊的基本邏輯、設計和業務流程。Consumer端的程式碼在保證高效、節點網路流量的負載均衡以及客戶端和服務端所有狀態的一致性、單執行緒方面做了大量非常好的設計和解決方案,同時,通過ConsumerGroup的概念、Topic訂閱的概念、基於Master/Slave設計的Group責任制(一個Group只有一個Consumer會被選舉為Group Leader,剩餘未Follower)、基於Master/Slave設計的TopicPartition責任制(對於每一個TopicPartition,只有一個Consumer會被選舉為Leader,剩餘作為Repliation),使得Kafka的訊息系統具有非常棒的輕鬆橫向擴充套件性,分散式環境下也有了很好的資料一致性(所有TopicParition的請求都發往這個TopicParition 的leader),這是我非常喜歡Kafka的一個重要原因。當然,這也對服務端的Leader角色提出了非常高的併發性。後面我們會介紹基於Reactor模式的設計,Kafka Server能夠很好處理高併發響應、多工處理的切換等。