1. 程式人生 > >Kafka - 叢集更新元資料

Kafka - 叢集更新元資料

Kafka叢集更新元資料 - waitOnMetadata()

通過比較版本號的方式,控制資料一致性。類似樂觀鎖的方式。 Sender執行緒在更新成功元資料之前,會一直阻塞主執行緒。

private long waitOnMetadata(String topic, logn maxWaitMs) throws InterruptedException {
	// metadata裡沒有指定topic的鍵值,就新增這個鍵值。
  if (!this.metadata.containsTopic(topic))
		metadata.add(topic);
    // metadata裡的指定topic不為空,表示元資料修改完成
	if (metadata.fetch().partitionsForTopic(topic) != null)
		return 0;
    // 程式的起始執行時間
	long begin = time.milliseconds();
    // 剩餘時間(開始為最大等待時間)
	long remainingWaitMs = maxWaitMs;
    // 當元資料沒有修改完成的時候
	while (metadata.fetch().partitionsForTopic(topic) == null) {
     // 獲取當前版本號
		int version = metadata.requestUpdate();
        // 喚醒sender執行緒,由sender執行緒執行元資料修改
		sender.wakeup();
    // 更新元資料
		metadata.awaitUpdate(version, remainingWaitMs);
    // 程式的執行時間
		long elapsed = time.milliseconds() - begin;
		if (eplased >= maxWaitMs) 
			throw new TimeoutException(..); 
    // 許可權的驗證
		if (metadata.fetch().unauthorizedTopics().contains(topic)) 
			throw new TopicAuthorizationException(topic);
		remainingWaitMs = maxWaitMs - elapsed;
	}
    // 最終的程式執行時間
	return time.milliseconds() - begin;
}
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
	....
	long begin = System.currentTimeMillis();
	long remainingWaitMs = maxWaitMs;
    // 記錄的版本號小於等於最新版本號
	while (this.version <= lastVersion) {
		if (remainingWaitMs != 0) 
            // 阻塞主執行緒
			wait(remainingWaitMs);
        // 程式的執行時間
		long elapsed = System.currentTimeMillis - begin;
    // 如果程式的執行時間大於等於最大等待時間
		if (elapsed >= maxWaitMs)
			throw new TimeOutException(..);
        // 剩餘時間等於最大等待時間減去程式執行時間
		remainingWaitMs = maxWaitMs - elapsed;
	}
}