kafka使用與設計原理
第一部分 kafka基礎簡介
kafka簡介
kafka是apache開源的基於zookeeper協調的分散式訊息系統,具有高吞吐率(可做到單機每秒幾十萬qps,基於磁碟進行儲存,做到時間複雜度O(1) )、高效能、實時、高可靠等特點,可實時處理流式資料。最早由Linkedin公司用scala語言開發。
kafka是訊息中介軟體的一種,訊息中介軟體還有active mq, rocket mq等。
訊息中介軟體的作用
當client呼叫server時,如果server響應慢,則會採用非同步,這時需要引用訊息系統。這時從client–>server,變成了client(producer) --> message queue --> server(consumer)。
使用訊息中介軟體的模式
點對點
生產者傳送訊息到queue中,然後消費者從queue中取出訊息並消費訊息。
注意:
- 訊息被消費以後,queue中不再儲存,所以訊息消費者不可能消費到已經被消費的訊息。
- queue可支援多個消費者,但對一個訊息而言,只會被一個消費者消費,其他消費者消費不到。
釋出/訂閱:
訊息生產者將訊息釋出到topic中,同時有多個消費者訂閱該訊息。不同於點對點,釋出到topic中的訊息會被所有訂閱者訂閱。
JMS(Java Message Service)
Java EE平臺上關於訊息中介軟體的技術規範,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的生產、傳送、接收訊息的介面簡化企業應用的開發。
備註:kafka不是完全按照JMS規範實現的,Active MQ是基於JMS做的,JMS規範更重一些。
使用message queue的好處
1)client與server解耦
2)資料冗餘(確保高可用,例如可失敗重試)
3)擴充套件性、靈活性(如多個生產者、多個消費者的數目增加和減少)
4)高併發時消峰(相當於通過非同步來解決高併發)
kafka的術語
生產者
消費者
topic
kafka將訊息分門別類,每一類的訊息稱之為topic(主題)
broker
即kafka的一臺伺服器,已釋出的訊息會儲存到一組kafka伺服器中,叢集中的每個伺服器都是一個broker(即代理)
訊息
kafka能幹什麼?
1)釋出、訂閱訊息(流),內建分割槽,副本和故障轉移:有利於處理大規模訊息以容錯的方式儲存訊息(流),訊息被持久化到本地磁碟。
2)流處理:可以持續獲取輸入topic的資料,進行加工處理,然後寫入輸出topic。(主要配合大資料來使用)
kafka常見的應用場景
1)日誌收集:然後用統一介面服務開發給各種消費者。
2)訊息系統:解耦生產者、消費者、快取訊息。
3)使用者活動跟蹤:收集埋點,然後訂閱者訊息埋點資料做實時監控分析;或裝載到hadoop或資料倉庫,做離線的分析與挖掘。
4)運營指標統計:用於運維監控,如收集系統指標和應用指標,做監控報警。
5)流式處理:結合spark streaming和storm。
kafka核心API
1)productor API: 釋出訊息到一個或多個topic
2)consumer API: 訂閱一個或多個topic, 並處理消費到的訊息
3)stream API: 充當一個流處理器,從1個或多個topic消費輸入流,並生產一個輸出流到1個或多個topic,有效將輸入流轉換到輸出流。
4)connector API: 用於不斷從源系統(如DB)或應用程式中拉取資料到kafka, 或從kafka提交資料到宿系統或應用程式。
5)AdminClient API: 允許管理和檢測topic, broker及其他kafka物件。
kafka的拓撲結構
kafka下載與安裝**
官方文件:http://kafka.apache.org/documentation/
下載地址:http://kafka.apache.org/downloads
當前kafka版本是2.0.1
下載後解壓即可,kafka不用安裝。
kafka預設在本地使用。如果要遠端訪問,需要配置config/server.properties
通過命令列來簡單使用kafka hello world**
/Users/shipeng/Documents/kafka學習/kafka_2.11-2.0.1
Step1. 啟動kafka前,要先啟動kafka自帶的zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
Step2. 啟動kafka
bin/kafka-server-start.sh config/server.properties
Step3. 建立topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
Step4. 檢視已建立的topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
Step5. 傳送訊息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
Step6. 接收訊息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Step7. 停止kafka
./bin/kafka-server-stop.sh
Step8. 停止zk
./bin/zookeeper-server-stop.sh
通過java客戶端來做kafka的生產者與消費者**
Step1. 新增maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
Step2. 寫生產者程式碼
官方文件地址:http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
public static void t1() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 發訊息的應答保障:all表示所有leader的副本都要寫成功並給應答。
props.put("retries", 0); // 是否重試,0表示不重試
props.put("batch.size", 16384); // 可批量傳送的訊息個數
props.put("linger.ms", 1); // 累積1毫秒再發送
props.put("buffer.memory", 33554432); // 累積1毫秒使用的buffer的大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 採用的kafka自帶序列化類
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 迴圈傳送100條訊息
for (int i = 0; i < 100; i++)
// producer.send為非同步傳送
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
System.out.println("producer complete");
producer.close();
}
Step3. 寫消費者程式碼
官方文件地址:http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
public static void t1() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test"); // 訊息時的groupid
props.put("enable.auto.commit", "true"); // 收到訊息後,是否自動提交
props.put("auto.commit.interval.ms", "1000"); // 消費時間間隔
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
第二部分 kafka實現原理
1. topic & partition & offset
kafka對同一個topic中的訊息進行分割槽(partition),每個partition都是有序的、不可變的佇列,並且可持續向其中新增訊息。(即topic中每個partition都是一個佇列)
partition中的訊息都被分了一個序列號,稱之為偏移量(offset),在partition中每個offset都是唯一的。
在建立topic時,可在config/server.properties中指定partition的數量,也可以在建立topic後修改partition的數量。
採用partition設計的好處
1)可以處理更多的訊息,不受單臺伺服器的限制:kafka基於檔案儲存,可將日誌分散到多個server上,從而避免檔案達到單機磁碟的上限。
2)分割槽可以作為並行處理單元,提高處理速度和吞吐率:每一條訊息被髮送到broker中,會根據partition規則(預設為隨機)選擇被儲存到哪一個partition。如果partition規則設定合理,則所有訊息會被均勻地分佈到不同的partition中,這樣就實現了水平擴充套件。
2. topic & partition & log
對於每一個partition, kafka叢集維護這一個分割槽的Log(即日誌檔案),每個partition在儲存層面是一個append log檔案。任何釋出到該partition的訊息都會被追加到Log檔案的尾部(這就相當於磁碟的順序寫,效能極高,因為無資料遷移與拷貝(如向檔案中間寫則需要copy))。
kafka叢集儲存所有的訊息,直到他們過期,無論訊息是否被消費了。
實際上,kafka消費者所持有的元資料只有1個offset,即消費者在這個Log中的偏移位置。
這個偏移量由消費者控制:當消費者消費訊息時,偏移量也是線性增加的。偏移量由消費者控制,消費者可以將偏移量重置為更老的一個偏移量,重新讀取訊息,一個消費者的操作不會影響其他消費者對此log的處理。
broker不需要記錄哪些訊息被消費者消費,offset完全由消費者客戶端和zookeeper記錄,這樣broker就不需要鎖,所以實現了高吞吐。
日誌存放
在物理儲存上,每個partition對應一個物理的資料夾,partition的命名規則為:topic名稱+有序序號,第一個序號從0開始。
partition是物理的概念,topic是邏輯的概念。
kafka 日誌檔案的存放位置:在config/server.properties中配置:
log.dir = /tmp/kafka-logs
實際上,生產環境中kafka日誌不會放在/tmp目錄下。
例如,剛才的測試結果日誌存放在:/tmp/kafka-logs/my-topic-0
shipengdeMacBook-Pro:my-topic-0 shipeng$ ls /tmp/kafka-logs/my-topic-0
00000000000000000000.index 00000000000000000002.snapshot
00000000000000000000.log leader-epoch-checkpoint
00000000000000000000.timeindex
訊息在磁碟上的格式
直接開啟00000000000000000000.index檔案是亂碼。
訊息是二進位制格式,並作為一個標準,所以訊息可以在producer, broker, client之間傳輸,無需再copy或轉換。
訊息格式如下:
message length : 4 bytes (value: 1+4+n) //訊息長度
"magic" value : 1 byte crc : 4 bytes
payload : n bytes
做最好的線上學習社群
//版本號 //CRC校驗碼 //具體的訊息
日誌分段
每個partition的log相當於一個資料夾,裡面的內容相當於一個巨型檔案,被分配到多個大小相等的segment(段) 檔案中。每個partition只需要支援順序讀寫就行了。但每個segment訊息數量不一定相等,這種特性方便old segment file被快速刪除,有效提高了磁碟利用率。
segment檔案儲存結構
生產者發訊息到某個topic, 訊息會被均勻地分佈到多個partition上,broker往對應的partition的最後一個segment上新增訊息。
當某個segment上面的訊息條數達到了配置閾值或訊息釋出時間超過閾值時,segment上面的訊息會被flush到磁碟上,只有flush到磁碟上的訊息,consumer才能消費。segment達到一定大小後,將不會再往該segment寫資料,broker會建立新的segment.
segment檔案由2大部分組成:index file & data file,這兩個檔案同時出現,一一對應,字尾分別為.index 和 .log,分別表示segment索引檔案和資料檔案。
segment檔案命名規則:partition全域性的第一個segment從0開始,後續每個segment檔名為上一個全域性partition的最大offset(偏移message數)。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。例如:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
其中,170410是偏移量。
這樣每個segment中儲存很多條訊息,訊息id有其邏輯位置決定,即從訊息id可直接定位到訊息的儲存位置,避免id到位置的額外對映。
備註:segment跟log4j比較相似,檔案多大或多長時間生成一個新的檔案。
segment的檔案儲存圖
.index 索引檔案儲存了大量的元資料,.log檔案儲存大量的訊息。索引檔案中的元資料指向對應資料檔案中message的物理偏移地址。其中以 .index 索引檔案中的元資料【3,348】為例,在 .log 資料檔案中表示第三個訊息,即在全域性partition中為170410+3=170413個訊息,該訊息的物理便宜地址為348。
日誌根據offset讀訊息的過程
Step1. 在儲存中找到對應的segment檔案(可找到index檔案和log檔案)(segment檔案是以offset命名,可用二分法查詢,時間複雜度O(logN)
Step2. 通過全域性的offset計算到segment內的offset.
Step3. 根據segment檔案的offset讀取訊息資料
注意:.index檔案在中的key不連續,即採用的是稀疏索引,即索引不連續。這樣做可以減少檔案大小。
例如,上圖中找第二個offset,由於沒有2的key, 那麼先讀1的key, 然後計算1key對應的訊息大小,例如1的訊息的大小是142,則用142作為2的偏移地址,這樣來找到2的訊息。
採用稀疏索引的好處:
為了減少索引檔案的大小,索引檔案變小了,可以直接Load到記憶體,記憶體操作效能更快。
日誌寫操作過程
日誌允許序列的追加訊息到檔案最後,當它達到配置檔案中設定的大小(1GB),就會滾動到新的檔案上。
日誌採用了2個配置引數:
M: 它定義了強制OS重新整理檔案到磁碟之前主動寫入的訊息數量;
S: 它定義了幾秒後強制重新整理。
這樣就提供了耐久性的保障,當系統崩潰的時候,最多丟M條訊息,或丟S秒資料。這個機制與redis類似。
日誌刪除操作
日誌會刪除一個時間段的日誌。kafka日誌管理器允許通過“插入刪除策略”來選擇刪除哪些檔案。目前的策略是刪除N天以前的日誌(相對於修改時間,預設儲存7天),檔案大小也可用作刪除策略。
即刪除可以根據時間或檔案大小,預設是時間。
對於刪除操作,採用了copy-on-write方式來避免加鎖。
3. kafka分散式:針對partition
log的partition被分配到叢集中的n個伺服器上,每個伺服器處理它負責的partition。根據配置, 每個partition還可以複製到其他伺服器作為備份容錯。
每個partition被複製成多份放到不同的機器上,每個partition有一個leader,0個或多個follower。leader處理此分割槽的所有讀寫請求,而follower被動地複製資料。如果leader宕機,其他的一個follower會被推舉為新的leader。一臺伺服器可能同時是一個partition的leader, 另一個partition的follower. 這樣可以負載均衡,避免所有的請求都只讓一臺或某幾臺伺服器處理。
kafka生產者
生產者向topic上釋出訊息,生產者也負責選擇釋出到topic上的哪個分割槽(即partition)。最簡單的方式是對partition列表中的partition輪詢,也可以根據演算法按權重分割槽。
傳統消費者
傳統的訊息模型可分為:點對點(即佇列:一條訊息只有一個消費者能消費到)** 和 **釋出/訂閱(一條訊息可被多個消費者消費) 模式。
kafka作為儲存系統
kafka作為儲存系統相比其他儲存系統的優勢:高效能。
高效能是通過寫入到kafka的資料將寫到磁碟,並複製到叢集中來保證容錯性,並允許生產者等待應答(ack: 可配置是否等待所有leader副本都寫入完成),直到訊息完全寫入。
由於kafka寫入是在檔案尾部追加,讀取時也是順序讀取,所以無論你伺服器上有50KB還是50TB,執行效能都是相同的。
由消費者來控制檔案讀取的位置offset。
所以可以認為kafka是一種專用於高效能、低延遲,提交日誌儲存,複製,傳播特殊用途的分散式檔案系統。
kafka流處理
流處理指消費topic中的訊息,處理後,再寫入到topic中。
對於簡單的處理,可直接用producer, consumer API來實現流處理。對於複雜的轉換,kafka提供了更強大的Streams API, 可構建聚合計算或連線“流”到一起的複雜應用程式。
kafka消費者 consumer group
kafka為傳統消費者模型中的兩種方式(點對點和釋出/訂閱)提供了統一的消費者抽象模型:consumer group。
一個釋出在topic中的訊息,被分發給consumer group中的一個消費者(此時就是點對點模型)。如果所有的消費者都在不同的group中,就變成了釋出/訂閱模型。
kafka broker是完全無狀態的,即不會記錄每個消費者的消費記錄,而由consumer group這個消費者來記錄。
上圖可以看出,consumer group不在kafka server中。
kafka客戶端(consumer)通過TCP長連線到broker來從叢集中消費訊息,並透明地處理kafka叢集中的故障伺服器,透明地調節適應叢集中變化的資料分割槽。
多個consumer的消費必須是順序地讀取partition中的message,新啟動的cosnumer預設從partition佇列最頭端最新的地方開始阻塞地讀message。如果覺得效能不高的話,可以通過加partition數量來做水平擴充套件。
一個consumer group下面的所有consumer thread一定會消費topic中所有的partition, 所以,推薦的設計是,consumer group下的consumer thread數量等於partition數量,這樣效率是最高的。
一個consumer可以消費不同的partition, 但kafka不保證資料間的順序性,即kafka只保證在一個partition上的資料是有序的,但讀多個partition時,根據讀partition的順序,結果順序會不同。(即只保證單個partition中訊息順序,不保證跨partition中的訊息順序)。
偏移量和消費者的位置
每條訊息在partition中都有一個偏移量offset,即分割槽中的唯一位置,該offset是訊息的唯一識別符號。
kafka消費者會持有下一條偏移量的位置,它比消費者看到的最大偏移量的位置還大1個,在消費者每次呼叫poll來接收訊息後,會自動+1。(備註:這個位置不需要消費者使用者來手動維護)
在消費者成功poll拉取到訊息後,會對kafka服務端做“提交”,這個“已移交”位置是已安全儲存的最後偏移量。(這個offset在早期版本中儲存在zk中,但由於這樣zk承受了太高的併發,在後續版本中(大概1.0版本),改成把offset記錄到了一個專門的topic中),如果kafka消費者程序掛掉後重啟,消費者仍可恢復到這個偏移量繼續消費。
消費者可選擇定期自動提交偏移量給kafka服務端,也可以選擇通過呼叫commit API來手動控制什麼時候提交(預設為每消費到1條後自動提交)
消費者group & topic訂閱
kafka通過將相同的group.id的消費者視為同一個消費者group。kafka通過程序池來消費訊息,這些程序池中的程序可以在同一個機器或不同的機器(這裡的機器指kafka叢集中的伺服器)上,這樣可實現擴充套件性和容錯性。
分組中的消費者們訂閱同一個topic時,kafka將該topic的訊息傳送到每個消費者組中;對於每一個消費者組,kafka通過partition來load balance該分組中的所有成員,這樣每個消費者組中的每個partition正好分配一個消費者。
備註:即一個group中同一個partition只能有一個consumer程序,且同一個group中的consumer數不要大於partition數,否則會造成有的consumer消費不到訊息。
消費者組的成員是動態維護的,如一個消費者故障,或當有新的消費者加入時,kafka將通過定時重新整理機制,將其分配給分組中的新成員,去掉舊成員。
可將消費者group看做是由多個程序組成的單一的邏輯訂閱者,所有程序都是單個消費者group的一部分(可看作是點對點的訊息佇列模型,group中的每個消費者都是一個程序),因此訊息傳遞時就像佇列一樣,在group中的所有消費者之間Load balance。
當分組重新分配自動發生時,可通過ConsumerRebalanceListener通知消費者(即一個回撥函式),這樣可允許我們自己來寫一些必要的應用程式級邏輯,如狀態清除、手動提交偏移量等等邏輯。
kafka也需要消費者通過使用assign(Collection)手動分配指定分割槽,但如果使用了手動指定,那麼動態分割槽分配和協調消費者組機制將失效(通常不手動指定)
發現消費者故障
當消費者訂閱一組topic後,當呼叫poll(long)時,消費者將自動加入到group中。只要持續呼叫poll,消費者將一直保持可用,並繼續從分配的分割槽中接收訊息。此外,消費者向伺服器定時傳送心跳:如果消費者崩潰或無法在session.timeout.ms配置的時間內傳送心跳,則消費者將被kafka服務端視為死亡,並將重新load balance消費者們。
消費者“活鎖”:消費者仍在持續傳送心跳,但它從不從partition中獲取訊息。而一個group中的一個partition只能有一個消費者程序,其他消費者無法訊息該partition中的訊息,這樣這個消費者就是“佔著茅坑不拉屎”,稱為“活鎖”。為了防止這種情況的發生,kafka採用了max.poll.interval.ms活躍檢測機制,如果消費者呼叫poll獲取訊息的頻率大於該時間,則kafka會把該消費者踢出該組,以便其他消費者接替他來消費訊息。所以,如果消費者要留在group中不被踢,必須持續呼叫poll. 當消費者被踢出後,你會看到offset提交失敗(呼叫commitSync()引發的CommitFailedException),這是一種安全機制,保證只有活動的消費者才能提交offset。
對於訊息消費者獲取到訊息後,如果處理消費到的訊息的時間不可預測時(即不知道處理訊息的業務處理需要花費多少時間),這樣推薦的方法是把消費到的訊息放到另一個執行緒中處理(非同步),讓消費者繼續呼叫poll(但要暫停從poll接收新訊息,以防止消費者被踢)。但必須注意確保已提交的offset不超過實際的位置,你必須禁用自動提交,並只有在執行緒處理完成後,才能做手動提交。還要注意,你需要暫停從poll接收到新訊息,讓執行緒處理完之前的訊息。
備註:這種對消費到的訊息業務處理時間很長時,不能在業務處理完成之前就消費kafka中的下一條訊息,另外,還要禁用自動提交,在業務處理完成後,再手動提交offset到kafka, 以防止丟訊息。
kafka保證
- 每個partition中的訊息無論生產,還是消費,都與寫入partition的訊息順序相同。
- 如果一個topic配置了n個副本(replication factor), 那麼允許n-1個伺服器宕機,而不丟失任何(已commit的)訊息。
第三部分 使用kafka提供的Java client API進行開發
1. Admin Client API 開發
官網文件:
http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
package kafka_test.kafka_test.admin_client;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
public class AdminClientTest {
private static final String TOPIC_NAME = "my-admin-topic";
public static void main(String[] args) throws Exception {
// createTopic();
// listTopic();
// deleteTopics();
// listTopic();
// listTopicWithCustomerOffsets();
// describeTopic();
// addPartitions();
// describeTopic();
// alterConfig();
// describeConfig();
describeCluster();
}
public static AdminClient getAdminClient() {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient client = AdminClient.create(props);
return client;
}
private static void createTopic() throws InterruptedException, ExecutionException {
NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
CreateTopicsResult ret = getAdminClient().createTopics(Arrays.asList(topic));
ret.all().get();
System.out.println("create topic is ok");
}
private static void listTopic() throws InterruptedException, ExecutionException {
ListTopicsResult ret = getAdminClient().listTopics();
Set<String> set = ret.names().get();
System.out.println("topic names = " + set);
}
private static void listTopicWithCustomerOffsets() throws InterruptedException, ExecutionException {
// __consumer_offsets
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult ret = getAdminClient().listTopics(options);
Set<String> set = ret.names().get();
System.out.println("topic names = " + set);
}
private static void describeTopic() throws Exception {
DescribeTopicsResult ret = getAdminClient().describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> ts = ret.all().get();
for (Map.Entry<String, TopicDescription> entry : ts.entrySet()) {
System.out.println("key=" + entry.getKey() + ", value=" + entry.getValue());
}
}
private static void describeConfig() throws Exception {
DescribeConfigsResult ret = getAdminClient()
.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME)));
Map<ConfigResource, Config> configs = ret.all().get();
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
ConfigResource key = entry.getKey();
Config value = entry.getValue();
System.out.println("resource key name=" + key.name() + ", type=" + key.type());
Collection<ConfigEntry> configEntries = value.entries();
for (ConfigEntry a : configEntries) {
System.out.println("resource value name=" + a.name() + ", value=" + a.value());
}
}
}
private static void alterConfig() throws Exception {
Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "false")));
AlterConfigsResult ret = getAdminClient().alterConfigs(
Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME), config));
ret.all().get();
System.out.println("alter config is ok");
}
// kafka的parition數目只能增加不能減少。
private static void addPartitions() throws Exception {
Map<String, NewPartitions> map = new HashMap<>();
map.put(TOPIC_NAME, NewPartitions.increaseTo(3));
CreatePartitionsResult ret = getAdminClient().createPartitions(map);
ret.all().get();
System.out.println("add partition is ok");
}
private static void deleteTopics() throws Exception{
DeleteTopicsResult ret = getAdminClient().deleteTopics(Arrays.asList(TOPIC_NAME)