深入學習Kafka:Topic的刪除過程分析
要刪除Topic,需要執行下面命令:
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
這裡假設zookeeper地址為localhost,要刪除的topic是test,這條命令實際上是在zookeeper的節點/admin/delete_topics下建立一個節點test,節點名為topic名字。(很多博文中說這個節點時臨時的,其實不是,是個持久節點,直到topic真正刪除時,才會被controller刪除)
執行這段命令後控制檯輸出
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
也就是說執行刪除命令,不是真正刪除,而是標記刪除,在zookeeper上新增/admin/delete_topics/test節點,也提醒了我們,需要提前開啟delete.topic.enable開關。
Kafka刪除Topic的原始碼分析
在Kafka中,Topic的刪除是靠DeleteTopicManager類來完成的。
當Broker被選舉成叢集Leader之後,KafkaController中的onControllerFailover會被呼叫,在該方法中會呼叫deleteTopicManager.start()來啟動刪除Topic的執行緒。
而當Broker不再成為叢集Leader時,KafkaController中的onControllerResignation會被呼叫,在該方法中會呼叫deleteTopicManager.shutdown()來關閉刪除Topic的執行緒。
在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine狀態機,並註冊了相應的事件監聽器,主要是監聽zookeeper節點/admin/delete_topics下子節點的變化。
def onControllerFailover() {
if(isRunning) {
// ...
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
// ...
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
class PartitionStateMachine{
def registerListeners() {
registerTopicChangeListener()
if(controller.config.deleteTopicEnable)
//註冊事件監聽,關注節點/admin/delete_topics下子節點的變化
registerDeleteTopicListener()
}
private def registerDeleteTopicListener() = {
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
private def deregisterDeleteTopicListener() = {
zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
}
kafka.controller.PartitionStateMachine.DeleteTopicsListener
DeleteTopicsListener將監聽zookeeper節點/admin/delete_topics下子節點的變化,當有childChange,即有新的topic需要被刪除時,該handleChildChange會被觸發,將該topic加入到deleteTopicManager的queue中
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkUtils = controllerContext.zkUtils
/**
* Invoked when a topic is being deleted
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
//監聽zookeeper節點/admin/delete_topics下子節點的變化,當有childChange,即有新的topic需要被刪除時,該handleChildChange會被觸發
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
//查詢Topic是否存在,若topic已經不存在了,則直接刪除/admin/delete_topics/< topic_name >節點
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// mark topic ineligible for deletion if other state changes are in progress
// 查詢topic是否為當前正在執行Preferred副本選舉或分割槽重分配,若果是,則標記為暫時不適合被刪除。
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
//新增topic到待刪除的queue中
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}
/**
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
TopicDeletionManager
class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
val controllerContext = controller.controllerContext
//partition狀態機
val partitionStateMachine = controller.partitionStateMachine
//replica狀態機
val replicaStateMachine = controller.replicaStateMachine
val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
val deleteLock = new ReentrantLock()
val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
(initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
val deleteTopicsCond = deleteLock.newCondition()
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
/**
* Invoked at the end of new controller initiation
*/
def start() {
if (isDeleteTopicEnabled) {
//如果topic.delete.enable=true,則啟動刪除執行緒
deleteTopicsThread = new DeleteTopicsThread()
if (topicsToBeDeleted.size > 0)
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
}
}
/**
* Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
*/
def shutdown() {
// Only allow one shutdown to go through
if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
// Resume the topic deletion so it doesn't block on the condition
//此時刪除執行緒有可能處於等待狀態,即awaitTopicDeletionNotification方法處於阻塞等待狀態,則喚醒該刪除執行緒
resumeTopicDeletionThread()
// Await delete topic thread to exit
//等待刪除執行緒doWork執行結束
deleteTopicsThread.awaitShutdown()
//清除資源
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
}
}
}
DeleteTopicsThread
DeleteTopicsThread繼承自ShutdownableThread,ShutdownableThread是一個可以迴圈執行某個方法(doWork方法)的執行緒,也提供了shutdown方法同步等待該執行緒真正執行結束,程式碼比較簡單。利用了CountDownLatch來阻塞呼叫shutdown的執行緒,待doWork真正執行結束時,再喚醒其他阻塞的執行緒。
ShutdownableThread
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
extends Thread(name) with Logging {
this.setDaemon(false)
this.logIdent = "[" + name + "], "
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
def shutdown() = {
//設定running狀態為false
initiateShutdown()
//等待在執行的任務執行完畢
awaitShutdown()
}
def initiateShutdown(): Boolean = {
if(isRunning.compareAndSet(true, false)) {
info("Shutting down")
isRunning.set(false)
if (isInterruptible)
interrupt()
true
} else
false
}
/**
* After calling initiateShutdown(), use this API to wait until the shutdown is complete
*/
def awaitShutdown(): Unit = {
//等待執行緒執行結束
shutdownLatch.await()
info("Shutdown completed")
}
/**
* This method is repeatedly invoked until the thread shuts down or this method throws an exception
*/
def doWork(): Unit
override def run(): Unit = {
info("Starting ")
try{
while(isRunning.get()){
doWork()
}
} catch{
case e: Throwable =>
if(isRunning.get())
error("Error due to ", e)
}
//計數器減一,喚醒在awaitShutdown方法上等待的執行緒
shutdownLatch.countDown()
info("Stopped ")
}
}
DeleteTopicsThread
當刪除Topic的事件通知到來,則doWork裡方法繼續往下執行:
當所有的replica都完成了topic的刪除動作,則呼叫completeDeleteTopic做最後的清理動作,包括zookeeper上節點的刪除,以及controller記憶體中的清理。
如果有replica將該topic標記為不可刪除(可能之前是由於該replica處於Preferred副本選舉或分割槽重分配的過程中),如果有,則重試將topic標記成刪除狀態
如果該topic可以被刪除,且還沒有處於已經開始刪除的狀態,則呼叫onTopicDeletion執行真正的刪除邏輯
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
val zkUtils = controllerContext.zkUtils
override def doWork() {
//等待刪除Topic的事件通知
awaitTopicDeletionNotification()
if (!isRunning.get)
return
inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
if(!topicsQueuedForDeletion.isEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
//如果所有的replica都完成了topic的刪除
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info("Deletion of topic %s successfully completed".format(topic))
} else {
//至少一個replica在開始刪除狀態
if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ignore since topic deletion is in progress
val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
val replicaIds = replicasInDeletionStartedState.map(_.replica)
val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
partitions.mkString(","), topic))
} else {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
//如果沒有replica處於開始刪除狀態(TopicDeletionStarted),並且也不是所有replica都刪除了該topic
//則判斷是否有replica將該topic標記為不可刪除,如果有,則重試將topic標記成刪除狀態
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
}
}
// Try delete topic if it is eligible for deletion.
//如果該topic可以被刪除,且還沒有處於已經開始刪除的狀態
if(isTopicEligibleForDeletion(topic)) {
info("Deletion of topic %s (re)started".format(topic))
// topic deletion will be kicked off
//觸發topic刪除事件
onTopicDeletion(Set(topic))
} else if(isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
}
}
completeDeleteTopic方法
完成刪除topic後會呼叫completeDeleteTopic進行一些清理工作,即:
刪除zookeeper上節點/brokers/topics/< topic_name >
刪除zookeeper上節點/config/topics/< topic_name >
刪除zookeeper上節點/admin/delete_topics/< topic_name >
並刪除記憶體中的topic相關資訊。
private def completeDeleteTopic(topic: String) {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine.deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
//刪除zookeeper上節點/brokers/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
//刪除zookeeper上節點/config/topics/< topic_name >
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
//刪除zookeeper上節點/admin/delete_topics/< topic_name >
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
//最後移除記憶體中的topic相關資訊
controllerContext.removeTopic(topic)
}
markTopicForDeletionRetry方法
將topic標記成OfflineReplica狀態來重試刪除
private def markTopicForDeletionRetry(topic: String) {
// reset replica states from ReplicaDeletionIneligible to OfflineReplica
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
.format(topic, failedReplicas.mkString(",")))
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}
onTopicDeletion方法
onTopicDeletion最終會呼叫startReplicaDeletion方法,來開始刪除這個topic的所有分割槽
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
// 向各broker更新原資訊,使得他們不再向外提供資料服務,準備開始刪除資料
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
}
}
private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
startReplicaDeletion(replicasPerPartition)
}
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// move dead replicas directly to failed state
//將所有已經掛掉的replica標記成ReplicaDeletionIneligible(無法刪除的Replica)
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
//將所有未掛掉的replica標記成OfflineReplica(下線的Replica),併發送給相應的broker,這樣這些broker就不會再向Leader傳送該topic的同步請求(FetchRequest)
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
//給所有replica傳送停止fetch請求,請求完成後,回撥deleteTopicStopReplicaCallback方法
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
if(deadReplicasForTopic.size > 0) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
}
}
}
//開始刪除topic開始時,會給存活的broker傳送停止fetch的請求,請求完成後,會回撥該方法
private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
inLock(controllerContext.controllerLock) {
// move all the failed replicas to ReplicaDeletionIneligible
//若有replica出現錯誤,則將它踢出可刪除的Replica列表
failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
//有些Replica已經成功刪除了資料
// some replicas could have been successfully deleted
val deletedReplicas = responseMap.keySet -- partitionsInError
completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
}
Kafka刪除Topic的過程
分析完原始碼,我們總結下,Kafka刪除Topic的過程
1. Kafka的broker在被選舉成controller後,會執行下面幾步
1.1 註冊DeleteTopicsListener,監聽zookeeper節點/admin/delete_topics下子節點的變化,delete命令實際上就是要在該節點下建立一個節點,名字是待刪除topic名,標記該topic是待刪除的
1.2 建立一個單獨的執行緒DeleteTopicsThread,來執行topic刪除的操作
2. DeleteTopicsThread執行緒啟動時會先在awaitTopicDeletionNotification處阻塞並等待刪除事件的通知,即有新的topic被新增到queue裡等待被刪除。
3. 當我們使用了delete命令在zookeeper上的節點/admin/delete_topics下建立子節點< topic_name >。
4. DeleteTopicsListener會收到ChildChange事件會依次判斷如下邏輯:
4.1 查詢topic是否存在,若已經不存在了,則直接刪除/admin/delete_topics/< topic_name >節點。
4.2 查詢topic是否為當前正在執行Preferred副本選舉或分割槽重分配,若果是,則標記為暫時不適合被刪除。
4.3 並將該topic新增到queue中,此時會喚醒DeleteTopicsThread中doWork方法裡awaitTopicDeletionNotification處的阻塞執行緒,讓刪除執行緒繼續往下執行。
而刪除執行緒執行刪除操作的真正邏輯是:
1. 它首先會向各broker更新原資訊,使得他們不再向外提供資料服務,準備開始刪除資料。
2. 開始刪除這個topic的所有分割槽
2.1. 給所有broker發請求,告訴它們這些分割槽要被刪除。broker收到後就不再接受任何在這些分割槽上的客戶端請求了
2.2. 把每個分割槽下的所有副本都置於OfflineReplica狀態,這樣ISR就不斷縮小,當leader副本最後也被置於OfflineReplica狀態時leader資訊將被更新為-1
2.3 將所有副本置於ReplicaDeletionStarted狀態
2.4 副本狀態機捕獲狀態變更,然後發起StopReplicaRequest給broker,broker接到請求後停止所有fetcher執行緒、移除快取,然後刪除底層log檔案
2.5 關閉所有空閒的Fetcher執行緒
3. 刪除zookeeper上節點/brokers/topics/< topic_name >
4. 刪除zookeeper上節點/config/topics/< topic_name >
5. 刪除zookeeper上節點/admin/delete_topics/< topic_name >
6. 並刪除記憶體中的topic相關資訊。
Kafka刪除Topic的流程圖
Q&A
前面我們分析了Kafka刪除Topic的原始碼,也總結了其刪除的過程,下面我們再來看看下面這些相關問題,加深對這個過程的理解
Q1:有分割槽掛掉的情況下,是否能正常刪除?
修改三個broker的server.properties,分別開啟delete.topic.enable=true
啟動zookeeper和三個broker,broker1,broker2,broker3,並啟動kafka-manager,
其中zookeeper埠為2181,
broker1埠為9091,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_1
broker2埠為9092,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_2
broker3埠為9093,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_3
kafka-manager埠為9000,訪問http://localhost:9000可以檢視kafka叢集情況
開始實驗,建立topic test
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test
並寫入幾條測試訊息
.\kafka-console-producer.bat --broker-list localhost:9092 --topic test
111
222
333
444
555
666
觀察zookeeper中路徑
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4, 5]
關閉broker2,並執行刪除topic命令
.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test
觀察zookeeper中路徑/admin/delete_topics
[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[test]
過幾秒後觀察只有broker2的log目錄下存在topic test的資料夾,而broker1和broker2的log目錄下已經刪除了test相關log
test-0,test-1,test-2,test-3,test-4,test-5
觀察broker1的controller.log
[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Dele