Kafka_Kafka 消費者 偏移量 與 積壓 查詢指令碼 kafka-consumer-groups.sh
本文章對應的 kafka 版本是 kafka_2.11-0.10.0.1
版本號的含義
scala 2.11
kafka 0.10.0.1
背景:
kafka 0.9 及以上 有了一個大版本變化,
主要有以下幾個方面:
1.kafka-client 不再區分高低api
2.kafka 消費者偏移量資訊 不再單純的儲存在 zookeeper 中, kafka 會自己維護自己的 消費情況。
對於某些特殊的情況:如 kafka-console-consumer , 目前在 0.10.0.1 還是會儲存在 zookeeper 中。
版本變遷 對 comsumer 影響 圖解:
正文:
根據上面的背景介紹,我們不難看出針對於 0.9 以及 0.9 以下的版本檢視 kafka 消費偏移量 的方式 有所不同。
由於目前主流環境都升級到 0.8 + , 這裡我們針對於 >= 0.9 的版本進行講解。
查詢方法:
這裡我們講解的方法主要是通過原生 kafka 提供的工具指令碼進行查詢。
工具指令碼的位置與名稱 為 bin/kafka-consumer-groups.sh
[ro[email protected] my_bin]# cd $KAFKA_HOME [[email protected] kafka]# cd bin/ [
[email protected] bin]# ll 總用量 116 -rwxr-xr-x. 1 root root 1052 8月 4 2016 connect-distributed.sh -rwxr-xr-x. 1 root root 1051 8月 4 2016 connect-standalone.sh -rwxr-xr-x. 1 root root 861 8月 4 2016 kafka-acls.sh -rwxr-xr-x. 1 root root 864 8月 4 2016 kafka-configs.sh -rwxr-xr-x. 1 root root 945 8月 4 2016 kafka-console-consumer.sh -rwxr-xr-x. 1 root root 944 8月 4 2016 kafka-console-producer.sh -rwxr-xr-x. 1 root root 871 8月 4 2016 kafka-consumer-groups.sh -rwxr-xr-x. 1 root root 872 8月 4 2016 kafka-consumer-offset-checker.sh -rwxr-xr-x. 1 root root 948 8月 4 2016 kafka-consumer-perf-test.sh -rwxr-xr-x. 1 root root 862 8月 4 2016 kafka-mirror-maker.sh -rwxr-xr-x. 1 root root 886 8月 4 2016 kafka-preferred-replica-election.sh -rwxr-xr-x. 1 root root 959 8月 4 2016 kafka-producer-perf-test.sh -rwxr-xr-x. 1 root root 874 8月 4 2016 kafka-reassign-partitions.sh -rwxr-xr-x. 1 root root 868 8月 4 2016 kafka-replay-log-producer.sh -rwxr-xr-x. 1 root root 874 8月 4 2016 kafka-replica-verification.sh -rwxr-xr-x. 1 root root 6358 8月 4 2016 kafka-run-class.sh -rwxr-xr-x. 1 root root 1364 8月 4 2016 kafka-server-start.sh -rwxr-xr-x. 1 root root 975 8月 4 2016 kafka-server-stop.sh -rwxr-xr-x. 1 root root 870 8月 4 2016 kafka-simple-consumer-shell.sh -rwxr-xr-x. 1 root root 945 8月 4 2016 kafka-streams-application-reset.sh -rwxr-xr-x. 1 root root 863 8月 4 2016 kafka-topics.sh -rwxr-xr-x. 1 root root 958 8月 4 2016 kafka-verifiable-consumer.sh -rwxr-xr-x. 1 root root 958 8月 4 2016 kafka-verifiable-producer.sh drwxr-xr-x. 2 root root 4096 8月 4 2016 windows -rwxr-xr-x. 1 root root 867 8月 4 2016 zookeeper-security-migration.sh -rwxr-xr-x. 1 root root 1381 8月 4 2016 zookeeper-server-start.sh -rwxr-xr-x. 1 root root 978 8月 4 2016 zookeeper-server-stop.sh -rwxr-xr-x. 1 root root 968 8月 4 2016 zookeeper-shell.sh
我們首先執行下這個指令碼,看下幫助:
Option Description
------ -----------
--bootstrap-server <server to connect REQUIRED (only when using new-
to> consumer): The server to connect to.
--command-config <command config Property file containing configs to be
property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
Pass in groups with a single topic to
just delete the given topic's
partition offsets and ownership
information for the given consumer
groups. For instance --group g1 --
group g2 --topic t1
Pass in just a topic to delete the
given topic's partition offsets and
ownership information for every
consumer group. For instance --topic
t1
WARNING: Group deletion only works for
old ZK-based consumer groups, and
one has to use it carefully to only
delete groups that are not active.
--describe Describe consumer group and list
offset lag related to given group.
--group <consumer group> The consumer group we wish to act on.
--list List all consumer groups.
--new-consumer Use new consumer.
--topic <topic> The topic whose consumer group
information should be deleted.
--zookeeper <urls> REQUIRED (unless new-consumer is
used): The connection string for the
zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
這裡我們先編寫一個生產者,消費者的例子,完整原始碼在最後面(java 實現)
我們先啟動消費者,再啟動生產者, 再通過 bin/kafka-consumer-groups.sh 進行消費偏移量查詢,
注意:
在執行指令碼查詢的時候,對於新的 kafka 自維護的 偏移量的 消費者來說 , 消費者 同時執行 是必須的。
否則會出現 group.id 指定 的 group 查詢不到的錯誤 !!!!
由於kafka 消費者記錄 group 的消費 偏移量 有兩種方式 :
1)kafka 自維護 (新)
2)zookpeer 維護 (舊) ,已經逐漸被廢棄
所以 ,指令碼檢視 消費偏移量的方式有兩種 kafka自維護 / zookeeper維護
kafka 維護 消費偏移量的 情況:
1. 檢視有那些 group ID 正在進行消費:
[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --list
group
kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --list
注意:
這裡面是沒有指定 topic 的,所以檢視的所有的 topic 的 消費者 的 group.id 的列表。
注意: 重名的 group.id 只會顯示一次
2.檢視指定group.id 的消費者消費情況
[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --group group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
group producer_consumer_java_test_20181009 0 2436 2437 1 consumer-1_/192.168.75.1
group test_find1 0 303094 303094 0 consumer-1_/192.168.75.1
group test_find1 1 303068 303068 0 consumer-1_/192.168.75.1
group test_find1 2 303713 303713 0 consumer-1_/192.168.75.1
kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.75.128:9092 --group group --describe
zookeeper 維護 消費偏移量的 情況:
1. 檢視有那些 group ID 正在進行消費:
[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --list
console-consumer-28542
[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --list
2.檢視指定group.id 的消費者消費情況
[[email protected] bin]# kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-28542 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
console-consumer-28542 test_find1 0 303094 303094 0 console-consumer-28542_master-1539167387803-268319a0-0
console-consumer-28542 test_find1 1 303068 303068 0 console-consumer-28542_master-1539167387803-268319a0-0
console-consumer-28542 test_find1 2 303713 303713 0 console-consumer-28542_master-1539167387803-268319a0-0
kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-28542 --describe
常見問題:
[[email protected] bin]# kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group group --describe
java.lang.RuntimeException: Request GROUP_COORDINATOR failed on brokers List(localhost:9092 (id: -1 rack: null))
at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:67)
at kafka.admin.AdminClient.findCoordinator(AdminClient.scala:72)
at kafka.admin.AdminClient.describeGroup(AdminClient.scala:125)
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:147)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:315)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:86)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:303)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:65)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
產生原因:
kafka 的 conf/server.properties 中設定了 host.name
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=127.0.0.1:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
host.name=192.168.75.128
#徹底刪除 topic
delete.topic.enable=true
==============================================
題外篇:
半多執行緒的生產者與消費者
生產者:
KafkaProducerSingleton.java
package test.kafka.vm.half_multi_thread;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
public final class KafkaProducerSingleton {
private static final Logger LOGGER = LoggerFactory
.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton() {
}
/**
* 靜態內部類
*
* @author tanjie
*/
private static class LazyHandler {
private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
}
/**
* 單例模式,kafkaProducer是執行緒安全的,可以多執行緒共享一個例項
*
* @return
*/
public static final KafkaProducerSingleton getInstance() {
return LazyHandler.instance;
}
/**
* kafka生產者進行初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
InputStream inStream = null;
try {
inStream = this.getClass().getClassLoader()
.getResourceAsStream("test/config/kafka/kafka.properties");
props.load(inStream);
//ISR 確認機制
props.put(ProducerConfig.ACKS_CONFIG,"1");
kafkaProducer = new KafkaProducer<String, String>(props);
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
} finally {
if (null != inStream) {
try {
inStream.close();
} catch (IOException e) {
LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);
}
}
}
}
}
/**
* 通過kafkaProducer傳送訊息
*
* @param topic 訊息接收主題
* @param partitionNum 哪一個分割槽
* @param retry 重試次數
* @param message 具體訊息值
*/
public void sendKafkaMessage(final String message) {
/**
* 1、如果指定了某個分割槽,會只講訊息發到這個分割槽上 2、如果同時指定了某個分割槽和key,則也會將訊息傳送到指定分割槽上,key不起作用
* 3、如果沒有指定分割槽和key,那麼將會隨機發送到topic的分割槽中 4、如果指定了key,那麼將會以hash<key>的方式傳送到分割槽中
*/
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", message);
// send方法是非同步的,新增訊息到快取區等待發送,並立即返回,這使生產者通過批量傳送訊息來提高效率
// kafka生產者是執行緒安全的,可以單例項傳送訊息
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
if (null != exception) {
LOGGER.error("kafka傳送訊息失敗:" + exception.getMessage(),
exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 當kafka訊息傳送失敗後,重試
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka傳送訊息失敗:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka例項銷燬
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
}
ProducerHandler.java
package test.kafka.vm.half_multi_thread;
public class ProducerHandler implements Runnable {
private String message;
public ProducerHandler(String message) {
this.message = message;
}
@Override
public void run() {
KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton
.getInstance();
kafkaProducerSingleton.init("test_find1", 3);
int i = 0;
while (true) {
try{
System.out.println("當前執行緒:" + Thread.currentThread().getName()
+ ",獲取的kafka例項:" + kafkaProducerSingleton);
kafkaProducerSingleton.sendKafkaMessage("傳送訊息: " + message + " " + (++i));
Thread.sleep(100);
}catch (Exception e){
}
}
}
}
啟動主函式:
ProducerMain.java
package test.kafka.vm.half_multi_thread;
/**
* Created by szh on 2018/10/10.
*/
public class ProducerMain {
public static void main(String[] args){
Thread thread = new Thread(new ProducerHandler("qqq"));
thread.start();
}
}
消費者:
Kafka_ConsumerAuto.java
package test.kafka.vm.half_multi_thread;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public final class Kafka_ConsumerAuto {
/**
* kafka消費者不是執行緒安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public Kafka_ConsumerAuto() {
Properties props = new Properties();
props.put("bootstrap.servers",
"192.168.75.128:9092");
props.put("group.id", "group");
// 關閉自動提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test_find1"));
}
public void execute() {
executorService = Executors.newFixedThreadPool(3);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
if (null != records) {
executorService.submit(new ConsumerThreadAuto(records, consumer));
}
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout");
}
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
ConsumerThreadAuto.java
package test.kafka.vm.half_multi_thread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.List;
/**
* 多消費者,多個work執行緒,難保證分割槽訊息消費的順序性
*
* @author tanjie
*/
public final class ConsumerThreadAuto implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThreadAuto(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
@Override
public void run() {
for(ConsumerRecord<String,String> record : records){
System.out.println("當前執行緒:" + Thread.currentThread() + ","
+ "偏移量:" + record.offset() + "," + "主題:"
+ record.topic() + "," + "分割槽:" + record.partition()
+ "," + "獲取的訊息:" + record.value());
}
}
}
ConsumerAutoMain.java
package test.kafka.vm.half_multi_thread;
/**
* Created by szh on 2018/10/10.
*/
public class ConsumerAutoMain {
public static void main(String[] args) {
Kafka_ConsumerAuto kafka_consumerAuto = new Kafka_ConsumerAuto();
try {
kafka_consumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_consumerAuto.shutdown();
}
}
}