Kafka Producer+Consumer
阿新 • • 發佈:2018-11-05
1.啟動zookeeper及Kafka:
cd /usr/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
2. 編寫Producer:
acks:-1代表所有broker都收到訊息了,leader再返回響應
retries:代表錯誤重試次數
batch.size:代表一批訊息的大小
linger.ms:代表每10ms即使batch未滿也傳送訊息
buffer.memory:緩衝區大小
max.block.ms:緩衝區滿後阻塞3s
3.編寫Consumer:
ConsumerRunnable:
ConsumerGroup:
主類:
設定3個執行緒輪詢
4.執行測試:
問題1:報錯:
根據錯誤資訊,是Listener設定錯誤,導致訊息傳送失敗
修改server.properties:
取消listeners一行註釋,並修改成listeners=PLAINTEXT://localhost:9092
重啟kafka,重新執行,傳送成功
問題2:只有Thread-1收到訊息,且分割槽全部為0,使用jconsole看到確實啟動了三個執行緒,並且全部處於RUNNABLE
原因:沒有配置分割槽,或者沒有為producer配置分割槽機制
執行命令:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 20 --topic test
設定分割槽數量
在Producer中新增分割槽器:
properties.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
除此之外不需要任何修改,重新執行