利用kafka內建的特殊topic 做offsets 提交和檢索
In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count
). Fortunately, Kafka
now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory
offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.
The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.
Step 1: Discover and connect to the offset manager for a consumer group by issuing a consumer metadata request to any broker
import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.network.BlockingChannel;
import java.util.*;
...
try {
BlockingChannel
channel = new BlockingChannel( "localhost" , 9092 ,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(),
5000 /*
read timeout in millis */ );
channel.connect();
final String
MY_GROUP = "demoGroup" ;
final String
MY_CLIENTID = "demoClientId" ;
int correlationId
= 0 ;
final TopicAndPartition
testPartition0 = new TopicAndPartition( "demoTopic" , 0 );
final TopicAndPartition
testPartition1 = new TopicAndPartition( "demoTopic" , 1 );
channel.send( new ConsumerMetadataRequest(MY_GROUP,
ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
ConsumerMetadataResponse
metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());
if (metadataResponse.errorCode()
== ErrorMapping.NoError()) {
Broker
offsetManager = metadataResponse.coordinator();
//
if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel
= new BlockingChannel(offsetManager.host(),
offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
|