Kafka舊版消費者API示例(低階消費者)
阿新 • • 發佈:2018-12-14
Kafka舊版高階消費者API強依賴ZK,目前高版本kafka已將offset移交內部主題,若高版本可選用新版API。
實現低階API變成實現的主要步驟:
-
獲取指定主題相應分割槽對應的元資料資訊
-
找出指定分割槽的Leader副本節點,建立SimpleConsumer,建立與Leader副本的連線
-
構造消費請求
-
獲取資料並處理
-
對偏移量進行處理
-
當代理髮生變化時進行相應處理,保證訊息被正常消費
1.建立一個類並定義部分常量:
public class LowConsumerAPI { /** * broker list */ private static final String BROKER_LIST = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094"; /** * 連線超時時間:1min */ private static final int TIME_OUT = 60 * 1000; /** * 讀取訊息快取區大小:1M */ private static final int BUFFER_SIZE = 1024 * 1024; /** * 每次獲取訊息的條數 */ private static final int FETCH_SIZE = 100000; /** * 發生錯誤時重試的次數 */// private static final int RETRIES_TIME = 3; /** * 允許發生錯誤的最大次數 */ private static final int MAX_ERROR_NUM = 3;
2.定義獲取主題相應分割槽元資料資訊的方法:
/** * 獲取指定主題指定分割槽的元資料 */ private PartitionMetadata fetchPartitionMetadata(List<String> brokerList, String topic, int partitionId) { SimpleConsumer consumer = null; TopicMetadataRequest metadataRequest = null; TopicMetadataResponse metadataResponse = null; List<TopicMetadata> topicMetadatas = null; try{ /* * 迴圈是因為不確定傳入的partition的leader節點是哪個 */ for(String host : brokerList) { // 1. 構建一個消費者SimpleConsumer,它是獲取元資料的執行者 String[] hostsAndPorts = host.split(":"); // 最後一個引數是 clientId consumer = new SimpleConsumer(hostsAndPorts[0], Integer.parseInt(hostsAndPorts[1]), TIME_OUT, BUFFER_SIZE, topic + "-" + "0" + "-" + "client"); // 2. 構造請求主題元資料資訊的請求 TopicMetadateRequest metadataRequest = new TopicMetadataRequest(Arrays.asList(topic)); // 3. 通過send()正式與代理通訊,傳送TopicMetadateRequest請求獲取元資料 try { metadataResponse = consumer.send(metadataRequest); } catch (Exception e) { //有可能與代理失去連線 System.out.println("get TopicMetadataResponse failed!"); e.printStackTrace(); continue; } // 4. 獲取主題元資料TopicMetadata列表,每個主題的每個分割槽的元資料資訊對應一個TopicMetadata物件 topicMetadatas = metadataResponse.topicsMetadata(); // 5. 遍歷主題元資料資訊列表 for(TopicMetadata topicMetadata : topicMetadatas) { //6. 獲取當前分割槽對應元資料資訊PartitionMetadata for(PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { if(partitionMetadata.partitionId() != partitionId) { continue; } else { return partitionMetadata; } } } } } catch (Exception e) { System.out.println("Fetch PartitionMetadata failed!"); e.printStackTrace(); } finally { if(consumer != null) { consumer.close(); } } return null; }
3.根據分割槽元資料資訊找出指定分割槽的Leader節點:
/** * 根據分割槽的元資料資訊獲取它的leader節點 */ private String getLeader(PartitionMetadata metadata) { if(metadata.leader() == null) { System.out.println("can not find partition" + metadata.partitionId() + "'s leader!"); return null; } return metadata.leader().host()+"_"+metadata.leader().port(); }
4.對偏移量進行管理:
/**
* 獲取指定主題指定分割槽的消費偏移量
*/
private long getOffset(SimpleConsumer consumer, String topic, int partition, long beginTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
/*
* PartitionOffsetRequestInfo(beginTime, 1)用於配置獲取offset的策略
* beginTime有兩個值可以取
* kafka.api.OffsetRequest.EarliestTime(),獲取最開始的消費偏移量,不一定是0,因為segment會刪除
* kafka.api.OffsetRequest.LatestTime(),獲取最新的消費偏移量
*/
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(beginTime, 1));
// 構造獲取offset的請求
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if(response.hasError()) {
System.out.println("get offset failed!" + response.errorCode(topic, partition));
return -1;
}
long[] offsets = response.offsets(topic, partition);
if(offsets == null || offsets.length == 0) {
System.out.println("get offset failed! offsets is null");
return -1;
}
return offsets[0];
}
5.當代理髮生變化時,做出相應變化:
/**
* 重新尋找partition的leader節點的方法
*/
private String findNewLeader(List<String> brokerList, String oldLeader, String topic, int partition) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = fetchPartitionMetadata(brokerList, topic, partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// 這裡考慮到 zookeeper 還沒有來得及重新選舉 leader 或者在故障轉移之前掛掉的 leader 又重新連線的情況
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
Thread.sleep(1000);
}
}
System.out.println("Unable to find new leader after Broker failure!");
throw new Exception("Unable to find new leader after Broker failure!");
}
6.定義consume方法,獲取資料並處理:
/**
* 處理資料的方法
*/
public void consume(List<String> brokerList, String topic, int partition) {
SimpleConsumer consumer = null;
try {
// 1. 獲取分割槽元資料資訊
PartitionMetadata metadata = fetchPartitionMetadata(brokerList,topic, partition);
if(metadata == null) {
System.out.println("can not find metadata!");
return;
}
// 2. 找到分割槽的leader節點
String leaderBrokerAndPort = getLeader(metadata);
String[] brokersAndPorts = leaderBrokerAndPort.split("_");
String leaderBroker = brokersAndPorts[0];
int port = Integer.parseInt(brokersAndPorts[1]);
String clientId = topic + "-" + partition + "-" + "client";
// 3. 建立一個消費者用於消費訊息
consumer = new SimpleConsumer(leaderBroker ,port ,TIME_OUT, BUFFER_SIZE, clientId);
// 4. 配置獲取offset的策略為,獲取分割槽最開始的消費偏移量
long offset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientId);
int errorCount = 0;
kafka.api.FetchRequest request = null;
kafka.javaapi.FetchResponse response = null;
while(offset > -1) {
// 執行過程中,可能因為處理錯誤,把consumer置為 null,所以這裡需要再例項化
if(consumer == null) {
consumer = new SimpleConsumer(leaderBroker ,port , TIME_OUT, BUFFER_SIZE, clientId);
}
// 5. 構建獲取訊息的request
request = new FetchRequestBuilder().clientId(clientId).addFetch(topic, partition, offset, FETCH_SIZE).build();
// 6. 獲取響應並處理
response = consumer.fetch(request);
if(response.hasError()) {
errorCount ++;
if(errorCount > MAX_ERROR_NUM) {
break;
}
short errorCode = response.errorCode(topic, partition);
if(ErrorMapping.OffsetOutOfRangeCode() == errorCode) {
// 如果是因為獲取到的偏移量無效,那麼應該重新獲取
// 這裡簡單處理,改為獲取最新的消費偏移量
offset = getOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientId);
continue;
} else if (ErrorMapping.OffsetsLoadInProgressCode() == errorCode) {
Thread.sleep(300000);
continue;
} else {
consumer.close();
consumer = null;
// 更新leader broker
leaderBroker = findNewLeader(brokerList, leaderBroker, topic, partition);
continue;
}
// 如果沒有錯誤
} else {
// 清空錯誤記錄
errorCount = 0;
long fetchCount = 0;
// 處理訊息
for(MessageAndOffset messageAndOffset : response.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if(currentOffset < offset) {
System.out.println("get an old offset[" + currentOffset + "], excepted offset is offset[" + offset + "]");
continue;
}
offset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
// 把訊息列印到控制檯
System.out.println("message: " + new String(bytes, "UTF-8") + ", offset: " + messageAndOffset.offset());
fetchCount++;
}
if (fetchCount == 0) {
Thread.sleep(1000);
}
}
}
} catch (Exception e) {
System.out.println("exception occurs when consume message");
e.printStackTrace();
} finally {
if (consumer != null) {
consumer.close();
}
}
}
7.定義主函式啟動:
public static void main(String[] args) {
LowConsumerAPI lowConsumerAPI = new LowConsumerAPI();
lowConsumerAPI.consume(Arrays.asList(BROKER_LIST.split(",")), "test", 0);
}