Kafka - 叢集更新元資料
阿新 • • 發佈:2018-12-13
Kafka叢集更新元資料 - waitOnMetadata()
通過比較版本號的方式,控制資料一致性。類似樂觀鎖的方式。 Sender執行緒在更新成功元資料之前,會一直阻塞主執行緒。
private long waitOnMetadata(String topic, logn maxWaitMs) throws InterruptedException { // metadata裡沒有指定topic的鍵值,就新增這個鍵值。 if (!this.metadata.containsTopic(topic)) metadata.add(topic); // metadata裡的指定topic不為空,表示元資料修改完成 if (metadata.fetch().partitionsForTopic(topic) != null) return 0; // 程式的起始執行時間 long begin = time.milliseconds(); // 剩餘時間(開始為最大等待時間) long remainingWaitMs = maxWaitMs; // 當元資料沒有修改完成的時候 while (metadata.fetch().partitionsForTopic(topic) == null) { // 獲取當前版本號 int version = metadata.requestUpdate(); // 喚醒sender執行緒,由sender執行緒執行元資料修改 sender.wakeup(); // 更新元資料 metadata.awaitUpdate(version, remainingWaitMs); // 程式的執行時間 long elapsed = time.milliseconds() - begin; if (eplased >= maxWaitMs) throw new TimeoutException(..); // 許可權的驗證 if (metadata.fetch().unauthorizedTopics().contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; } // 最終的程式執行時間 return time.milliseconds() - begin; }
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) { .... long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; // 記錄的版本號小於等於最新版本號 while (this.version <= lastVersion) { if (remainingWaitMs != 0) // 阻塞主執行緒 wait(remainingWaitMs); // 程式的執行時間 long elapsed = System.currentTimeMillis - begin; // 如果程式的執行時間大於等於最大等待時間 if (elapsed >= maxWaitMs) throw new TimeOutException(..); // 剩餘時間等於最大等待時間減去程式執行時間 remainingWaitMs = maxWaitMs - elapsed; } }