1. 程式人生 > 實用技巧 >Kafka Notes Basics操作筆記

Kafka Notes Basics操作筆記

Preface:

簡單地整理了一下Kafka的基礎操作指令和簡單的Java實現。未來會整理一篇Kafka的概念篇以及做一篇我開發的供數小程式walk through。

安裝流程參考:

1.Kafka安裝+配置 Ubuntu16.04 環境kafka部署以及專案demo https://blog.csdn.net/sinat_30026065/article/details/82148587

2.Ubuntu 16下單機安裝配置zookeeper和kafka https://www.cnblogs.com/vipzhou/p/7235625.html

*Kafka各種報錯       https://www.icode9.com/content-4-440422.html

Cluster ID不一致報錯    https://blog.csdn.net/lixiaogang_theanswer/article/details/105679680

開啟kafka配置檔案(server.properties), 找到該選項引數配置(log.dirs=/home/ssd/kafka-本地儲存kafka分割槽、log、index等資料檔案的目錄)的目錄位置,cd /home/ssd/kafka下,刪除(rm -rf ./*)掉該目錄下的所有檔案.然後嘗試重新啟動就可以了。

問題出現原因分析:之前用其他版本的kafak在該目錄下建立了一些主題資訊(kafka內部會在使用者指定目錄下儲存許多與保證服務正常工作的相關檔案),後面升級到新的kafka版本,然後複用的是該log目錄位置,但是沒有對該log目錄位置下的資料進行情理,導致新版本的kafka服務起來之後報錯。將目錄清理之後,重啟服務正常工作。

指令:

啟動卡夫卡:bin/

./zookeeper-server-start.sh /home/yuchenwu/Kafka/kafka_2.12-2.5.0/config/zookeeper.properties zookeeper

不在console顯示日誌,轉存日誌啟動:

bin/nohup ./kafka-server-start.sh /home/yuchenwu/Kafka/kafka_2.12-2.5.0/config/server.properties > ../logs/kafkaqidongrizhi.log &

1.建立topic

topic kafka-topics.sh --zookeeper<zookeeper connect> --create --topic <string
> --replication-factor <integer> --partitions <integer>
bin/ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

其中localhost:2181為zookeeper的名字和埠,test為topic的名字自行修改

2.增加分割槽

從消費者角度來看,為基於鍵的主題新增分割槽是很困難的。因為如果改變了分割槽的數量,鍵到分割槽之間的對映也會發生變化。
所以,對於基於鍵的主題來說,建議在一開始就設定好分割槽數量,避免以後對其進行調整。

kafka-toplcs.sh --zookeeper master:2181 --alter -- topic <topicName> --partitions 16

**將 my-topic 主題的分割槽數量增加到 16。
3.減少分割槽

我們無法減少主題的分割槽數量。因為如果刪除了分割槽,分割槽裡的資料也一併被刪除,導致資料不一致。
我們也無法將這些資料分配給其他分割槽,因為這樣做很難,而且會出現訊息亂序。所以,如果一定要減少分割槽數量,只能刪 除整個主題,然後重新建立它。
4.刪除主題

為了能夠刪除主題, broker 的 delete.topic. enable 引數必須被設定為 true。如果該引數被設為 false,刪除主題的請求會被忽略。

kafka-topics.sh --zookeeper master:2181 --delete -- topic my-topic 

5.列出主題

-kafka-topics.sh --zookeeper master:2181 --list

6.列出主題詳細資訊-

kafka-topics.sh --zookeeper master:2181 --describe <topicName>

7.消費者群組-列出新版本的消費者群組

 kafka-consumer-groups.sh new-consumer --bootstrap-server master:9092 --list 

8.分割槽管理 ---TBC
9.消費和生產

在使用 Kafka時,有時候為了驗證應用程式,需要手動讀取訊息或手動生成訊息。
這個時 候可以藉助 kafka-console-consumer.sh 和 kafka-console-producer.sh 這兩個工具,它們包裝 了 Java 客戶端,讓使用者不需要編寫整個應用程式就可以與 Kafka 主題發生互動。
10.控制檯生產者-往主題test上釋出訊息: 預設情況下,該工具將命令列輸入的每一行視為一個訊息,訊息的鍵和值以 Tab 字元分隔(如果沒有出現 Tab 字元,那麼鍵就是 null)。

kafka-console-producer.sh --broker-list localhost:9092 --topic <name>
Test Message 1
Test Message 2
^D 

11.控制檯消費者

kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning   ##@Deprecated老版本不要用

./kafka-console-consumer.sh --bootstrap-server master:9092 --topic gongshu --from-beginning

同樣,注意broker名字和埠
12.重置

offset ./kafka-consumer-groups.sh --bootstrap-server master:9092 --group <consumerGroup> --reset-offsets --all-topics --to-offset 0 --execute

–to-earliest:把位移調整到分割槽當前最小位移
–to-latest:把位移調整到分割槽當前最新位移
–to-current:把位移調整到分割槽當前位移
–to-offset <offset>: 把位移調整到指定位移處
–shift-by N: 把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
–to-datetime <datetime>:把位移調整到大於給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
–by-duration <duration>:把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S
–from-file <file>:從CSV檔案中讀取調整策略

3.往主題test上釋出訊息:

kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test Message 1
Test Message 2
^D 

注意自行修改broker埠localhost:9092

4.從主題test上讀取訊息:

kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning



Java實現Kafkahttps://blog.csdn.net/rain_web/article/details/83473709

Maven依賴

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.9.0.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.0</version>
    </dependency>

一些例子

1.建立生產者:

private Properties kafkaProps = new Properties(); 
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer= new KafkaProducer<String,String>(kafkaProps); 

建立一個新的properties——鍵值為string,所以使用內建的stringSerializer——建立新的生產者物件,把properties傳給它

2.傳送訊息:
一、傳送並忘記fire-and-forget

ProducerRecord<String, String> record = new ProducerRecord<>( "CustomerCountry","Precision Products", "France");
try {
producer.send(record);
} 
catch (Exception e){
e.printStackTrace(); 
}

建立producerRecord物件——此建構函式需要topic名字,鍵,值。它們都是字串,必須與生產者和序列化器匹配——用send()方法傳送。如果不指定key,則會設定成null——
send會返回一個包含RecordMatadata的Future物件,不過我們忽略返回值。如果不關心傳送結果,那麼可以使用這種傳送方式。
比如,記錄 Twitter 訊息日誌,或記錄不太重要的應用程式日誌。

二、同步傳送-我們使用 send() 方怯傳送訊息, 它會返回…個 Future 物件,呼叫 get() 方法進行等待, 就可以知道悄息是否傳送成功。

ProducerRecord<String, String> record = new ProducerRecord<>( "CustomerCountry","Precision Products", "France");
try {
producer.send(record).get();
} 
catch (Exception e){
e.printStackTrace(); 
}

get()方怯會丟擲異常。如果沒有發生錯 誤,我們會得到一個 RecordMetadata 物件,可以用它獲取訊息的偏移量。
如果在傳送資料之前或者在傳送過程中發生了任何錯誤,比如 broker 返回 了一個不允許重發訊息的異常或者已經超過了重發的次數,那麼就會丟擲異常。
我們只是簡單地把 異常資訊打印出來。

三、非同步傳送訊息-呼叫 send() 方怯,並指定一個回撥函式, 伺服器在返回響應時呼叫該函式。

private class DemoProducerCallback implements Callback {
@Override 
public void onCompletion(RecordMetadata recordMetadata , Exception e){
if(e != null) { 
e.printStackTrace() ;
} 
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry","Biomedical Materials", "USA");
producer.send(record,new DemoProducerCallback());

如果 Kafka 返回一個錯誤, onCompletfon 方邑會丟擲一個非空(non null)異常。

2.建立消費者

Properties props = new Properties();
props.put( "bootstrap.servers", "broker1:9092,broker2: 9092");
props.put( "group.id","CountryCounter");
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 

是新增了group.id 屬性,它指定了消費者所屬群組的 名字。其實並非必須的,只是建立一個不屬於任何群組的消費者不太常見

3.訂閱主題

consumer.subscribe(Collections.singletonlist("customerCountries")); 

為了簡單起見,我們建立了一個只包含單個元素的列表,主題的名字叫作"customerCountries"。
我們也可以在呼叫 subscribe() 方法時傳入一個正則表示式。正則表示式可以匹配多個主題,
如果有人建立了新的主題,並且主題的名字與正則表示式匹配,那麼會立即觸發一次 再均衡,消費者就可以讀取新新增的主題。
consumer.subscribe( "test.*"); 訂閱test相關所有主題

4.消費者輪詢-訊息輪詢是消費者 API 的核心,通過一個簡單的輪詢向伺服器請求資料。
一旦消費者訂閱 了主題,輪詢就會處理所有的細節,包括群組協調、分割槽再均衡、傳送心跳和獲取資料,開發者只需要使用一組簡單的 API 來處理從分割槽返回的資料

try { 
while(true){ 
ConsumerRecords<String , String> records = consumer.poll(100)
for(ConsumerRecord<String,String> record : records){ 
log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n" , record.topic(), record.partition(),record.offset(),record.key(),record.value());
int updatedCount = 1;
if(custCountryMap.countainsValue(record.value())){ 
updatedCount = custCountryMap.get(record.value()) + 1; 
custCountryMap.put(record.value(), updatedCount);
JSONObject json = new JSONObject(custCountryMap); 
System.out.println(json.toString(4));
} 
}
}
}
finally {
consumer.close();
}

poll() 方能返回一個記錄列表。每條記錄都包含了記錄所屬主題的資訊、記錄所在分 區的資訊、 記錄在分割槽裡的偏移量,以及記錄的鍵值對。
我們一般會遍歷這個列表,逐 條處理這些記錄。poll()方法有一個超時引數,它指定了方蓓在多久之後可以返回, 不管有沒有可用的資料都要返回。
超時時間的設定取決於應用程式對響應速度的要求, 比如要在多長時間內把控制權歸還給執行輪詢的執行緒。

5.提交偏移量
1.自動提交。 enable.auto.commit 被設為 true,不過並沒有為開發者留有餘地來避免重複處理訊息,auto.commit.interval.ms = 5
2.提交當前偏移量 auto.commit.offset 設為 false ,讓應用程式決定何時提交偏移量。使用 commitSync() ,會一直提交,直到提交成功或者發生無法恢復的錯誤

while (true){ 
ConsumerRecords<String, String>records = consumer.poll(100);
for(ConsumerRecord<String, String> record :records){ 
System.out.printf("topic= %s , partition= %s, offset = %d, customer = % s , country = %s\n", record.topic(), record.partition(), record.offset(),
record.key(), record.value()); 
} 
try {
consumer.commitSync();
}catch (CoMMitFailedException e) { 
log.error("commit failed", e)
}
}

3.非同步提交 -我們只管傳送提交請求,無需等待 broker 的響應。

while (true){ 
ConsumerRecords<String, String>records = consumer.poll(100);
for(ConsumerRecord<String, String> record :records){ 
System.out.printf("topic= %s , partition= %s, offset = %d, customer = % s , country = %s\n", record.topic(), record.partition(), record.offset(),
record.key(), record.value()); 
} 
consumer.commitAsync();
}

4.同步和非同步組合提交
如果一切正常,我們使用 commitAsync() 方陸來提交。這樣速度更快,而且即使這次提 交失敗,下一次提交很可能會成功。
如果直接關閉消費者,就沒有所謂的"下一次提交"了。使用 commitSync()方也會一 直重式,直到提交成功或發生無陸恢復的錯誤。


References Other Than Listed Above

  • Kafa權威指南 Kafka The Definitive Guide