1. 程式人生 > >獲取kafka最新offset-java

獲取kafka最新offset-java

class net bat args 聚集地 zookeeper time() 需要 使用

之前筆者曾經寫過通過scala的方式獲取kafka最新的offset

但是大多數的情況我們需要使用java的方式進行獲取最新offset

以下是通過java代碼獲取kafka最新offset

GetOffsetShellWrap

public class GetOffsetShellWrap {

private static Logger log = LoggerFactory.getLogger(GetOffsetShellWrap.class);
private String topic;
private int port;
private String host;
private int time;

public GetOffsetShellWrap(String topic,int port,String host,int time) {
this.topic = topic;
this.port = port;
this.host = host;
this.time = time;
}
public Map<String, String> getEveryPartitionMaxOffset() {
//1.獲取topic所有分區 以及每個分區的元數據 => 返回 Map<分區id,分區元數據>
TreeMap<Integer, PartitionMetadata> partitionIdAndMeta = findTopicEveryPartition();

Map<String, String> map = new HashMap<String, String>();
for (Entry<Integer, PartitionMetadata> entry : partitionIdAndMeta.entrySet()) {
int leaderPartitionId = entry.getKey();
//2.根據每個分區的元數據信息 ==> 獲取leader分區的主機
String leadBroker = entry.getValue().leader().host();
String clientName = "Client" + topic + "
" + leaderPartitionId;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port,100000, 64 * 1024, clientName);
//3.從leader主機獲取分區的offset
long readOffset = getLastOffset(consumer, topic, leaderPartitionId, clientName);
map.put(String.valueOf(leaderPartitionId), String.valueOf(readOffset));
if (consumer != null)
consumer.close();
}
return map;
}

private TreeMap<Integer, PartitionMetadata> findTopicEveryPartition(){
TreeMap<Integer, PartitionMetadata> map = new TreeMap<Integer, PartitionMetadata>();
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(host, port, 100000, 64 * 1024,"leaderLookup" + new Date().getTime());
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
if(metaData!=null && !metaData.isEmpty()){
TopicMetadata item = metaData.get(0);
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (consumer != null)
consumer.close();
}
return map;
}

private long getLastOffset(SimpleConsumer consumer, String topic,int leaderPartitionId, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,leaderPartitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
log.error("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, leaderPartitionId));
return 0;
}
long[] offsets = response.offsets(topic, leaderPartitionId);
return offsets[0];
}

}
GetOffsetShellWrapJavaTest

public class GetOffsetShellWrapJavaTest {
public static void main(String[] args) {
int port = 9092;
String topic = "2017-11-6-test";
int time = -1;
GetOffsetShellWrap offsetSearch = new GetOffsetShellWrap(topic,port,"hadoop-01",time);
Map<String, String> map = offsetSearch.getEveryPartitionMaxOffset();
for (String key : map.keySet()) {
System.out.println(key+"---"+map.get(key));
}
}
}
結果輸出:

0---16096
1---15930
2---16099
技術分享圖片
”我自己是一名從事了十余年的後端的老程序員,辭職後目前在做講師,近期我花了一個月整理了一份最適合2018年學習的JAVA幹貨(裏面有高可用、高並發、高性能及分布式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)從事後端的小夥伴們都可以來了解一下的,這裏是程序員秘密聚集地,各位還在架構師的道路上掙紮的小夥伴們速來。“

加QQ群:611481448(名額有限哦!)

獲取kafka最新offset-java