1. 程式人生 > >八.Kafka Consumer和 offset提交

八.Kafka Consumer和 offset提交

原文https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
When Kafka was originally created, it shipped with a Scala producer and consumer client. Over time we came to realize many of the limitations of these APIs. For example, we had a “high-level” consumer API which supported consumer groups and handled failover, but didn’t support many of the more complex usage scenarios. We also had a “simple” consumer client which provided full control, but required users to manage failover and error handling themselves. So we set about redesigning these clients in order to open up many use cases that were hard or impossible with the old clients and establish a set of APIs we could support over the long haul.

The first phase of this was rewriting the Producer API in 0.8.1. The recent 0.9 release completed the second phase with the introduction of the new Consumer API. Building on top of a new group coordination protocol provided by Kafka itself, the new consumer brings the following advantages:

Clean Consolidated API: the new consumer combines the capabilities of both the older “simple” and “high-level” consumer clients, providing both group coordination and lower level access to build your own consumption strategy with.
Reduced Dependencies: the new consumer is written in pure Java. It has no dependence on the Scala runtime or on Zookeeper, which makes it a much lighter library to include in your project.
Better Security: the security extensions implemented in Kafka 0.9 are only supported by the new consumer.
This new consumer also adds a set of protocols for managing fault-tolerant groups of consumer processes. Previously this functionality was implemented with a thick Java client (that interacted heavily with Zookeeper). The complexity of this logic made it hard to build fully featured consumers in other languages. With the introduction of this new protocol, this has now become far far easier. In fact we’ve moved the C client over to this protocol already.
Although the new consumer uses a redesigned API and a new coordination protocol, the concepts are not fundamentally different, so users familiar with the old consumer shouldn’t have much trouble understanding it. However, there are some subtle details in particular with respect to group management and the threading model which requires some extra care. The purpose of this tutorial is to cover the basic usage of the new consumer and explain all of these details.

One word of caution: at the time of writing, the new consumer is still considered “beta” in terms of stability. We have fixed several important bugs in the 0.9.0 branch, so if you run into any problems using the 0.9.0.0 release of Kafka, we encourage you to test against that branch. If you still see issues, please report it on the mail lists or on JIRA.

Getting Started
Before getting into the code, we should review some basic concepts. In Kafka, each topic is divided into a set of logs known as partitions. Producers write to the tail of these logs and consumers read the logs at their own pace. Kafka scales topic consumption by distributing partitions among a consumer group, which is a set of consumers sharing a common group identifier. The diagram below shows a single topic with three partitions and a consumer group with two members. Each partition in the topic is assigned to exactly one member in the group.

Distributing Kafka Partitions among consumer groups

While the old consumer depended on Zookeeper for group management, the new consumer uses a group coordination protocol built into Kafka itself. For each group, one of the brokers is selected as the group coordinator. The coordinator is responsible for managing the state of the group. Its main job is to mediate partition assignment when new members arrive, old members depart, and when topic metadata changes. The act of reassigning partitions is known as rebalancing the group.

When a group is first initialized, the consumers typically begin reading from either the earliest or latest offset in each partition. The messages in each partition log are then read sequentially. As the consumer makes progress, it commits the offsets of messages it has successfully processed. For example, in the figure below, the consumer’s position is at offset 6 and its last committed offset is at offset 1.

consumer’s position in the log

When a partition gets reassigned to another consumer in the group, the initial position is set to the last committed offset. If the consumer in the example above suddenly crashed, then the group member taking over the partition would begin consumption from offset 1. In that case, it would have to reprocess the messages up to the crashed consumer’s position of 6.

The diagram also shows two other significant positions in the log. The log end offset is the offset of the last message written to the log. The high watermark is the offset of the last message that was successfully copied to all of the log’s replicas. From the perspective of the consumer, the main thing to know is that you can only read up to the high watermark. This prevents the consumer from reading unreplicated data which could later be lost.

Configuration and Initialization
To get started with the consumer, add the kafka-clients dependency to your project. The maven snippet is provided below:


org.apache.kafka
kafka-clients
0.9.0.0-cp1

The consumer is constructed using a Properties file just like the other Kafka clients. In the example below, we provide the minimal configuration needed to use consumer groups.

Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “consumer-tutorial”);
props.put(“key.deserializer”, StringDeserializer.class.getName());
props.put(“value.deserializer”, StringDeserializer.class.getName());
KafkaConsumer

bin/kafka-topics.sh –create –topic consumer-tutorial –replication-factor 1 –partitions 3 –zookeeper localhost:2181

bin/kafka-verifiable-producer.sh –topic consumer-tutorial –max-messages 200000 –broker-list localhost:9092

Then we can create a small driver to setup a consumer group with three members, all subscribed to the same topic we have just created.

public static void main(String[] args) {
int numConsumers = 3;
String groupId = “consumer-tutorial-group”
List topics = Arrays.asList(“consumer-tutorial”);
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

final List consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace;
}
}
});
}
This example submits the three runnable consumers to an executor. Each thread is given a separate id so that you can see which thread is receiving data. The shutdown hook will be invoked when you stop the process, which will halt the three threads using wakeup and wait for them to shutdown. If you run this, you should see lots of data from all of the threads. Here is a sample from one run:

2: {partition=0, offset=928, value=2786}
2: {partition=0, offset=929, value=2789}
1: {partition=2, offset=297, value=891}
2: {partition=0, offset=930, value=2792}
1: {partition=2, offset=298, value=894}
2: {partition=0, offset=931, value=2795}
0: {partition=1, offset=278, value=835}
2: {partition=0, offset=932, value=2798}
0: {partition=1, offset=279, value=838}
1: {partition=2, offset=299, value=897}
1: {partition=2, offset=300, value=900}
1: {partition=2, offset=301, value=903}
1: {partition=2, offset=302, value=906}
1: {partition=2, offset=303, value=909}
1: {partition=2, offset=304, value=912}
0: {partition=1, offset=280, value=841}
2: {partition=0, offset=933, value=2801}
The output shows consumption across all three partitions. Each partition has been assigned to one of the threads. Within each partition, you can see the offsets increasing as expected. You can shutdown the process using Ctrl-C from the command line or through your IDE.

Consumer Liveness
When part of a consumer group, each consumer is assigned a subset of the partitions from topics it has subscribed to. This is basically a group lock on those partitions. As long as the lock is held, no other members in the group will be able to read from them. When your consumer is healthy, this is exactly what you want. It’s the only way that you can avoid duplicate consumption. But if the consumer dies due to a machine or application failure, you need that lock to be released so that the partitions can be assigned to a healthy member.

Kafka’s group coordination protocol addresses this problem using a heartbeat mechanism. After every rebalance, all members of the current generation begin sending periodic heartbeats to the group coordinator. As long as the coordinator continues receiving heartbeats, it assumes that members are healthy. On every received heartbeat, the coordinator starts (or resets) a timer. If no heartbeat is received when the timer expires, the coordinator marks the member dead and signals the rest of the group that they should rejoin so that partitions can be reassigned. The duration of the timer is known as the session timeout and is configured on the client with the setting session.timeout.ms.

props.put(“session.timeout.ms”, “60000”);

The session timeout ensures that the lock will be released if the machine or application crashes or if a network partition isolates the consumer from the coordinator. However, application failures are a little trickier to handle generally. Just because the consumer is still sending heartbeats to the coordinator does not necessarily mean that the application is healthy.

The consumer’s poll loop is designed to handle this problem. All network IO is done in the foreground when you call poll or one of the other blocking APIs. The consumer does not use any background threads. This means that heartbeats are only sent to the coordinator when you call poll. If your application stops polling (whether because the processing code has thrown an exception or a downstream system has crashed), then no heartbeats will be sent, the session timeout will expire, and the group will be rebalanced.

The only problem with this is that a spurious rebalance might be triggered if the consumer takes longer than the session timeout to process messages. You should therefore set the session timeout large enough to make this unlikely. The default is 30 seconds, but it’s not unreasonable to set it as high as several minutes. The only downside of a larger session timeout is that it will take longer for the coordinator to detect genuine consumer crashes.

Delivery Semantics
When a consumer group is first created, the initial offset is set according to the policy defined by the auto.offset.reset configuration setting. Once the consumer begins processing, it commits offsets regularly according to the needs of the application. After every subsequent rebalance, the position will be set to the last committed offset for that partition in the group. If the consumer crashes before committing offsets for messages that have been successfully processed, then another consumer will end up repeating the work. The more frequently you commit offsets, the less duplicates you will see in a crash.

In the examples thus far, we have assumed that the automatic commit policy is enabled. When the setting enable.auto.commit is set to true (which is the default), the consumer automatically triggers offset commits periodically according to the interval configured with “auto.commit.interval.ms.” By reducing the commit interval, you can limit the amount of re-processing the consumer must do in the event of a crash.

To use the consumer’s commit API, you should first disable automatic commit by setting enable.auto.commit to false in the consumer’s configuration.

props.put(“enable.auto.commit”, “false”);

The commit API itself is trivial to use, but the most important point is how it is integrated into the poll loop. The following examples therefore include the full poll loop with the commit details in bold. The easiest way to handle commits manually is with the synchronous commit API:

try {
while (running) {
ConsumerRecords

bin/kafka-consumer-groups.sh –new-consumer –describe –group consumer-tutorial-group –bootstrap-server localhost:9092

Which results in output like this:

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumer-tutorial-group, consumer-tutorial, 0, 6667, 6667, 0, consumer-1_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 1, 6667, 6667, 0, consumer-2_/127.0.0.1
consumer-tutorial-group, consumer-tutorial, 2, 6666, 6666, 0, consumer-3_/127.0.0.1
This shows all the partitions assigned within the consumer group, which consumer instance owns it, and the last committed offset (reported here as the “current offset”). The lag of a partition is the difference between the log end offset and the last committed offset. Administrators can monitor this to ensure that the consumer group is keeping up with the producers.

Using Manual Assignment
As mentioned at the start of this tutorial, the new consumer implements lower level access for use cases which don’t need consumer groups. The convenience of this is one of the strongest reasons to adopt this API. The older “simple” consumer also provided this, but it required you to do a lot of error handling yourself. With the new consumer, you just need to assign the partitions you want to read from and then start polling for data.

The example below shows how to assign all the partitions from a topic using the partitionsFor API.

List partitions = new ArrayList<>();
for (PartitionInfo partition : consumer.partitionsFor(topic))
partitions.add(new TopicPartition(topic, partition.partition()));
consumer.assign(partitions);
Similar to subscribe, the call to assign must pass the full list of partitions you want to read from. Once partitions are assigned, the poll loop will work exactly like before.

One word of caution, however. All offset commits go through the group coordinator regardless of whether it is a simple consumer or a consumer group. Hence if you need to commit offsets, then you still must set group.id to a reasonable value to prevent conflicts with other consumers. If a simple consumer tries to commit offsets with a group id which matches an active consumer group, the coordinator will reject the commit (which will result in a CommitFailedException). However, there won’t be any errors if another simple consumer instance shares the same group id.

Conclusion
The new consumer brings a number of benefits to the Kafka community including a cleaner API, better security, and reduced dependencies. This tutorial introduced its basic usage with a focus on poll semantics and using the commit API to control delivery semantics. There are many more details to cover, but this should be enough to get you started. Although the consumer is still being actively worked on, we encourage you to give it a try. If you run into any problems, tell us about it on the mailing list.