Kafka Java Consumer實現(一)
阿新 • • 發佈:2018-12-17
Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)
High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取資料、更新offset、設定偏移量等操作遮蔽掉,直接將操作資料流的處理工作提供給編寫程式的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)
Lower Level Consumer API:通過直接操作底層API獲取資料的方式獲取Kafka中的資料,需要自行給定分割槽、偏移量等屬性。優點:可操作性強;缺點:程式碼相對而言比較複雜。(入口類:SimpleConsumer)
一、KafkaBrokerInfo:自定義bean類,主要功能儲存連線kafka的broker的元資料,比如host&port;程式碼如下:
/** * Kafka伺服器連線引數 * Created by gerry on 12/21. */ public class KafkaBrokerInfo { // 主機名 public final String brokerHost; // 埠號 public final int brokerPort; /** * 構造方法 * * @param brokerHost Kafka伺服器主機或者IP地址 * @param brokerPort 埠號 */ public KafkaBrokerInfo(String brokerHost, int brokerPort) { this.brokerHost = brokerHost; this.brokerPort = brokerPort; } /** * 構造方法, 使用預設埠號9092進行構造 * * @param brokerHost */ public KafkaBrokerInfo(String brokerHost) { this(brokerHost, 9092); } }
二、KafkaTopicPartitionInfo:自定義bean類,主要功能是儲存讀取具體分割槽的資訊,包括topic名稱和partition ID;程式碼如下:
/** * Created by gerry on 02/22. */ public class KafkaTopicPartitionInfo { // 主題名稱 public final String topic; // 分割槽id public final int partitionID; /** * 建構函式 * * @param topic 主題名稱 * @param partitionID 分割槽id */ public KafkaTopicPartitionInfo(String topic, int partitionID) { this.topic = topic; this.partitionID = partitionID; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; KafkaTopicPartitionInfo that = (KafkaTopicPartitionInfo) o; if (partitionID != that.partitionID) return false; return topic != null ? topic.equals(that.topic) : that.topic == null; } @Override public int hashCode() { int result = topic != null ? topic.hashCode() : 0; result = 31 * result + partitionID; return result; } }
三、JavaKafkaSimpleConsumerAPI:具體通過Kafka提供的LowerAPI操作Kafka的相關程式碼,包括資料的讀取、偏移量的讀取、更新等操作;具體程式碼如下:
import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.*;
/**
* TODO: 新增必要的日誌列印資訊
* Kafka Lower consumer api ==> Kafka Simple Consumer API
* Created by gerry on 12/21.
*/
public class JavaKafkaSimpleConsumerAPI {
// 最大重試次數
private int maxRetryTimes = 5;
// 重試間隔時間
private long retryIntervalMillis = 1000;
// 快取Topic/Partition對應的Broker連線資訊
private Map<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>> replicaBrokers = new HashMap<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>>();
/**
* 執行入口
*
* @param maxReads 最多讀取記錄數量
* @param topicPartitionInfo 讀取資料的topic分割槽資訊
* @param seedBrokers 連線topic分割槽的初始化連線資訊
* @throws Exception
*/
public void run(long maxReads,
KafkaTopicPartitionInfo topicPartitionInfo,
List<KafkaBrokerInfo> seedBrokers) throws Exception {
// 預設消費資料的偏移量是當前分割槽的最早偏移量值
long whichTime = kafka.api.OffsetRequest.EarliestTime();
// 構建client name及groupId
String topic = topicPartitionInfo.topic;
int partitionID = topicPartitionInfo.partitionID;
String clientName = this.createClientName(topic, partitionID);
String groupId = clientName;
// 獲取當前topic分割槽對應的分割槽元資料(主要包括leader節點的連線資訊)
PartitionMetadata metadata = this.findLeader(seedBrokers, topic, partitionID);
// 校驗元資料
this.validatePartitionMetadata(metadata);
// 連線leader節點構建具體的SimpleConsumer物件
SimpleConsumer consumer = this.createSimpleConsumer(metadata.leader().host(),
metadata.leader().port(), clientName);
try {
// 獲取當前topic、當前consumer的消費資料offset偏移量
int times = 0;
long readOffSet = -1;
while (true) {
readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, whichTime, clientName);
if (readOffSet == -1) {
// 當返回為-1的時候,表示異常資訊
if (times > this.maxRetryTimes) {
throw new RuntimeException("Fetch the last offset of those group:" + groupId + " occur exception");
}
// 先休眠,再重新構建Consumer連線
times++;
this.sleep();
consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
continue;
}
// 正常情況下,結束迴圈
break;
}
System.out.println("The first read offset is:" + readOffSet);
int numErrors = 0;
boolean ever = maxReads <= 0;
// 開始資料讀取操作迴圈,當maxReads為非正數的時候,一直讀取資料;當maxReads為正數的時候,最多讀取maxReads條資料
while (ever || maxReads > 0) {
// 構建獲取資料的請求物件, 給定獲取資料對應的topic、partition、offset以及每次獲取資料最多獲取條數
kafka.api.FetchRequest request = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(topic, partitionID, readOffSet, 100000)
.build();
// 傳送請求到Kafka,並獲得返回值
FetchResponse response = consumer.fetch(request);
// 如果返回物件表示存在異常,進行異常處理,並進行consumer重新連線的操作
// 當異常連續出現次數超過5次的時候,程式丟擲異常
if (response.hasError()) {
String leaderBrokerHost = consumer.host();
numErrors++;
short code = response.errorCode(topic, partitionID);
System.out.println("Error fetching data from the Broker:" + leaderBrokerHost + " Reason:" + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// 異常表示是offset異常,重新獲取偏移量即可
readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
// 重新建立一個SimpleConsumer物件
consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
continue;
}
// 重置失敗次數
numErrors = 0;
// 接收資料沒有異常,那麼開始對資料進行具體操作,eg: 列印
long numRead = 0;
for (MessageAndOffset messageAndOffset : response.messageSet(topic, partitionID)) {
// 校驗偏移量
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffSet) {
System.out.println("Found and old offset:" + currentOffset + " Expection:" + readOffSet);
continue;
}
// 獲取下一個讀取資料開始的偏移量
readOffSet = messageAndOffset.nextOffset();
// 讀取資料的value
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(currentOffset + ": " + new String(bytes, "UTF-8"));
numRead++;
maxReads--;
}
// 更新偏移量
consumer = this.updateOffset(consumer, topic, partitionID,
readOffSet, groupId, clientName, 0);
// 如果沒有讀取資料,休眠一秒鐘
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {
// nothings
}
}
}
System.out.println("執行完成....");
} finally {
// 關閉資源
if (consumer != null) {
try {
consumer.close();
} catch (Exception e) {
// nothings
}
}
}
}
/**
* 驗證分割槽元資料,如果驗證失敗,直接丟擲IllegalArgumentException異常
*
* @param metadata
*/
private void validatePartitionMetadata(PartitionMetadata metadata) {
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exiting!!");
throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting!!");
}
if (metadata.leader() == null) {
System.out.println("Can't find Leader for Topic and Partition. Exiting!!");
throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting!!");
}
}
/**
* Finding the Lead Broker for a Topic and Partition<br/>
* 獲取主題和分割槽對應的主Broker節點(即topic和分割槽id是給定引數的對應brokere節點的元資料)<br/>
* 獲取方式:
*
* @param brokers Kafka叢集連線引數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
* @param topic topic名稱
* @param partitionID 分割槽id
* @return
*/
public PartitionMetadata findLeader(
List<KafkaBrokerInfo> brokers,
String topic,
int partitionID) {
PartitionMetadata returnMetadata = null;
for (KafkaBrokerInfo broker : brokers) {
SimpleConsumer consumer = null;
try {
// 1. 建立簡單的消費者連線物件
consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, 100000, 64 * 1024, "leaderLookUp");
// 2. 構建獲取引數的Topic名稱引數集合
List<String> topics = Collections.singletonList(topic);
// 3. 構建請求引數
TopicMetadataRequest request = new TopicMetadataRequest(topics);
// 4. 請求資料,得到返回物件
TopicMetadataResponse response = consumer.send(request);
// 5. 獲取返回值
List<TopicMetadata> metadatas = response.topicsMetadata();
// 6. 遍歷返回值
for (TopicMetadata metadata : metadatas) {
// 獲取當前metadata對應的分割槽
String currentTopic = metadata.topic();
if (topic.equalsIgnoreCase(currentTopic)) {
// 遍歷所有分割槽的原始資料 ==> 當前分割槽的元資料
for (PartitionMetadata part : metadata.partitionsMetadata()) {
if (part.partitionId() == partitionID) {
// 1. 找到對應的元資料
returnMetadata = part;
// 2. 更新備份節點的host資料
if (returnMetadata != null) {
KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
if (brokerInfos == null) {
brokerInfos = new ArrayList<KafkaBrokerInfo>();
} else {
brokerInfos.clear();
}
for (Broker replica : returnMetadata.replicas()) {
brokerInfos.add(new KafkaBrokerInfo(replica.host(), replica.port()));
}
this.replicaBrokers.put(topicPartitionInfo, brokerInfos);
return returnMetadata;
}
}
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + broker.brokerHost + "] to find Leader for [" + topic + ", " + partitionID + "] Reason:" + e);
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (Exception e) {
// nothings
}
}
}
}
// 沒有找到,返回一個空值,預設情況下,不會返回該值
return null;
}
/**
* 獲取當前groupID對應的consumer在對應的topic和partition中對應的offset偏移量
*
* @param consumer 消費者
* @param groupId 消費者分割槽id
* @param topic 所屬的Topic
* @param partitionID 所屬的分割槽ID
* @param whichTime 用於判斷,當consumer從沒有消費資料的時候,從當前topic的Partition的那個offset開始讀取資料
* @param clientName client名稱
* @return 正常情況下,返回非負數,當出現異常的時候,返回-1
*/
public long getLastOffSet(SimpleConsumer consumer, String groupId,
String topic, int partitionID,
long whichTime, String clientName) {
// 1. 從ZK中獲取偏移量,當zk的返回偏移量大於0的時候,表示是一個正常的偏移量
long offset = this.getOffsetOfTopicAndPartition(consumer, groupId, clientName, topic, partitionID);
if (offset > 0) {
return offset;
}
// 2. 獲取當前topic當前分割槽的資料偏移量
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionID));
return -1;
}
// 獲取偏移量
long[] offsets = response.offsets(topic, partitionID);
return offsets[0];
}
/**
* 從儲存consumer消費者offset偏移量的位置獲取當前consumer對應的偏移量
*
* @param consumer 消費者
* @param groupId Group Id
* @param clientName client名稱
* @param topic topic名稱
* @param partitionID 分割槽id
* @return
*/
public long getOffsetOfTopicAndPartition(SimpleConsumer consumer, String groupId, String clientName, String topic, int partitionID) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
List<TopicAndPartition> requestInfo = new ArrayList<TopicAndPartition>();
requestInfo.add(topicAndPartition);
OffsetFetchRequest request = new OffsetFetchRequest(groupId, requestInfo, 0, clientName);
OffsetFetchResponse response = consumer.fetchOffsets(request);
// 獲取返回值
Map<TopicAndPartition, OffsetMetadataAndError> returnOffsetMetadata = response.offsets();
// 處理返回值
if (returnOffsetMetadata != null && !returnOffsetMetadata.isEmpty()) {
// 獲取當前分割槽對應的偏移量資訊
OffsetMetadataAndError offset = returnOffsetMetadata.get(topicAndPartition);
if (offset.error() == ErrorMapping.NoError()) {
// 沒有異常,表示是正常的,獲取偏移量
return offset.offset();
} else {
// 當Consumer第一次連線的時候(zk中不在當前topic對應資料的時候),會產生UnknownTopicOrPartitionCode異常
System.out.println("Error fetching data Offset Data the Topic and Partition. Reason: " + offset.error());
}
}
// 所有異常情況直接返回0
return 0;
}
/**
* 根據給定引數獲取一個新leader的分割槽元資料資訊
*
* @param oldLeader
* @param topic
* @param partitionID
* @return
*/
private PartitionMetadata findNewLeaderMetadata(String oldLeader,
String topic,
int partitionID) {
KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
for (int i = 0; i < 3; i++) {
boolean gotoSleep = false;
PartitionMetadata metadata = this.findLeader(brokerInfos, topic, partitionID);
if (metadata == null) {
gotoSleep = true;
} else if (metadata.leader() == null) {
gotoSleep = true;
} else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// leader切換過程中
gotoSleep = true;
} else {
return metadata;
}
if (gotoSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// nothings
}
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting!!");
throw new RuntimeException("Unable to find new leader after Broker failure. Exiting!!");
}
/**
* 更新偏移量,當SimpleConsumer發生變化的時候,重新構造一個新的SimpleConsumer並返回
*
* @param consumer
* @param topic
* @param partitionID
* @param readOffSet
* @param groupId
* @param clientName
* @param times
* @return
* @throws RuntimeException 當更新失敗的情況下
*/
private SimpleConsumer updateOffset(SimpleConsumer consumer, String topic, int partitionID, long readOffSet, String groupId, String clientName, int times) {
// 構建請求物件
Map<TopicAndPartition, OffsetAndMetadata> requestInfoMap = new HashMap<TopicAndPartition, OffsetAndMetadata>();
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
requestInfoMap.put(topicAndPartition, new OffsetAndMetadata(readOffSet, OffsetAndMetadata.NoMetadata(), -1));
kafka.javaapi.OffsetCommitRequest ocRequest = new OffsetCommitRequest(groupId, requestInfoMap, 0, clientName);
// 提交修改偏移量的請求,並獲取返回值
kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(ocRequest);
// 根據返回值進行不同的操作
if (response.hasError()) {
short code = response.errorCode(topicAndPartition);
if (times > this.maxRetryTimes) {
throw new RuntimeException("Update the Offset occur exception," +
" the current response code is:" + code);
}
if (code == ErrorMapping.LeaderNotAvailableCode()) {
// 當異常code為leader切換情況的時候,重新構建consumer物件
// 操作步驟:先休眠一段時間,再重新構造consumer物件,最後重試
try {
Thread.sleep(this.retryIntervalMillis);
} catch (InterruptedException e) {
// nothings
}
PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
topic, partitionID);
this.validatePartitionMetadata(metadata);
consumer = this.createSimpleConsumer(metadata.leader().host(),
metadata.leader().port(), clientName);
// 重試
consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
}
if (code == ErrorMapping.RequestTimedOutCode()) {
// 當異常為請求超時的時候,進行重新請求
consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
}
// 其他code直接丟擲異常
throw new RuntimeException("Update the Offset occur exception," +
" the current response code is:" + code);
}
// 返回修改後的consumer物件
return consumer;
}
/**
* 構建clientName根據主題名稱和分割槽id
*
* @param topic
* @param partitionID
* @return
*/
private String createClientName(String topic, int partitionID) {
return "client_" + topic + "_" + partitionID;
}
/**
* 根據一個老的consumer,重新建立一個consumer物件
*
* @param consumer
* @param topic
* @param partitionID
* @return
*/
private SimpleConsumer createNewSimpleConsumer(SimpleConsumer consumer, String topic, int partitionID) {
// 重新獲取新的leader節點
PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
topic, partitionID);
// 校驗元資料
this.validatePartitionMetadata(metadata);
// 重新建立consumer的連線
return this.createSimpleConsumer(metadata.leader().host(),
metadata.leader().port(), consumer.clientId());
}
/**
* 構建一個SimpleConsumer並返回
*
* @param host
* @param port
* @param clientName
* @return
*/
private SimpleConsumer createSimpleConsumer(String host, int port, String clientName) {
return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName);
}
/**
* 休眠一段時間
*/
private void sleep() {
try {
Thread.sleep(this.maxRetryTimes);
} catch (InterruptedException e) {
// nothings
}
}
/**
* 關閉對應資源
*
* @param consumer
*/
private static void closeSimpleConsumer(SimpleConsumer consumer) {
if (consumer != null) {
try {
consumer.close();
} catch (Exception e) {
// nothings
}
}
}
/**
* 從Kafka叢集中獲取指定topic的分割槽ID<br/>
* 如果叢集中不存在對應的topic,那麼返回一個empty的集合
*
* @param brokers Kafka叢集連線引數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
* @param topic 要獲取ID對應的主題
* @param soTimeout 過期時間
* @param bufferSize 緩衝區大小
* @param clientId client連線ID
* @return
*/
public static List<Integer> fetchTopicPartitionIDs(List<KafkaBrokerInfo> brokers, String topic, int soTimeout, int bufferSize, String clientId) {
Set<Integer> partitionIDs = new HashSet<Integer>();
List<String> topics = Collections.singletonList(topic);
// 連線所有的Kafka伺服器,然後獲取引數 ==> 遍歷連線
for (KafkaBrokerInfo broker : brokers) {
SimpleConsumer consumer = null;
try {
// 構建簡單消費者連線物件
consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, soTimeout, bufferSize, clientId);
// 構建請求引數
TopicMetadataRequest tmRequest = new TopicMetadataRequest(topics);
// 傳送請求
TopicMetadataResponse response = consumer.send(tmRequest);
// 獲取返回結果
List<TopicMetadata> metadatas = response.topicsMetadata();
// 遍歷返回結果,獲取對應topic的結果值
for (TopicMetadata metadata : metadatas) {
if (metadata.errorCode() == ErrorMapping.NoError()) {
// 沒有異常的情況下才進行處理
if (topic.equals(metadata.topic())) {
// 處理當前topic對應的分割槽
for (PartitionMetadata part : metadata.partitionsMetadata()) {
partitionIDs.add(part.partitionId());
}
// 處理完成,結束迴圈
break;
}
}
}
} finally {
// 關閉連線
closeSimpleConsumer(consumer);
}
}
// 返回結果
return new ArrayList<Integer>(partitionIDs);
}
}
四、JavaKafkaSimpleConsumerAPITest:測試類;主要程式碼如下:
import java.util.ArrayList;
import java.util.List;
/**
* Created by gerry on 12/21.
*/
public class JavaKafkaSimpleConsumerAPITest {
public static void main(String[] args) {
JavaKafkaSimpleConsumerAPI example = new JavaKafkaSimpleConsumerAPI();
long maxReads = 300;
String topic = "test2";
int partitionID = 0;
KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
List<KafkaBrokerInfo> seeds = new ArrayList<KafkaBrokerInfo>();
seeds.add(new KafkaBrokerInfo("192.168.187.146", 9092));
try {
example.run(maxReads, topicPartitionInfo, seeds);
} catch (Exception e) {
e.printStackTrace();
}
// 獲取該topic所屬的所有分割槽ID列表
System.out.println(example.fetchTopicPartitionIDs(seeds, topic, 100000, 64 * 1024, "client-id"));
}
}
五、測試
六、Kafka Pom檔案依賴
<properties>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>