1. 程式人生 > >kafka---broker 保存消息

kafka---broker 保存消息

ids -- 過期 存儲 序列圖 所有 fire mage sta

1 、存儲方式

物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:

技術分享圖片

圖.4

2 、存儲策略

無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數據:

1. 基於時間:log.retention.hours=168
2. 基於大小:log.retention.bytes=1073741824

需要註意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這裏刪除過期文件與提高 Kafka 性能無關。

3、 topic 創建與刪除

3.1 創建 topic

創建 topic 的序列圖如下所示:

技術分享圖片

圖.5

流程說明:

1. controller 在 ZooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被創建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2. controller從 /brokers/ids 讀取當前所有可用的 broker 列表,對於 set_p 中的每一個 partition:
	2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,並將AR設置為新的 ISR
	2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通過 RPC 向相關的 broker 發送 LeaderAndISRRequest。

3.2 刪除 topic

刪除 topic 的序列圖如下所示:

技術分享圖片

圖.6

流程說明:

1. controller 在 zooKeeper 的 /brokers/topics 節點上註冊 watcher,當 topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,結束;否則 controller 註冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調向對應的 broker 發送 StopReplicaRequest。

kafka---broker 保存消息