1. 程式人生 > >Kafka Producer+Consumer

Kafka Producer+Consumer

1.啟動zookeeper及Kafka:

cd /usr/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

2. 編寫Producer:

acks:-1代表所有broker都收到訊息了,leader再返回響應
retries:代表錯誤重試次數
batch.size:代表一批訊息的大小
linger.ms:代表每10ms即使batch未滿也傳送訊息
buffer.memory:緩衝區大小
max.block.ms:緩衝區滿後阻塞3s
3.編寫Consumer:
ConsumerRunnable:


ConsumerGroup:

主類:

設定3個執行緒輪詢
4.執行測試:
問題1:報錯:

根據錯誤資訊,是Listener設定錯誤,導致訊息傳送失敗
修改server.properties:
取消listeners一行註釋,並修改成listeners=PLAINTEXT://localhost:9092

重啟kafka,重新執行,傳送成功

問題2:只有Thread-1收到訊息,且分割槽全部為0,使用jconsole看到確實啟動了三個執行緒,並且全部處於RUNNABLE
原因:沒有配置分割槽,或者沒有為producer配置分割槽機制
執行命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 20 --topic test 

設定分割槽數量


在Producer中新增分割槽器:

properties.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");

除此之外不需要任何修改,重新執行