1. 程式人生 > >Kafka Java Consumer實現(一)

Kafka Java Consumer實現(一)

Kafka提供了兩種Consumer API,分別是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)

High Level Consumer API:高度抽象的Kafka消費者API;將底層具體獲取資料、更新offset、設定偏移量等操作遮蔽掉,直接將操作資料流的處理工作提供給編寫程式的人員。優點是:操作簡單;缺點:可操作性太差,無法按照自己的業務場景選擇處理方式。(入口類:ConsumerConnector)

Lower Level Consumer API:通過直接操作底層API獲取資料的方式獲取Kafka中的資料,需要自行給定分割槽、偏移量等屬性。優點:可操作性強;缺點:程式碼相對而言比較複雜。(入口類:SimpleConsumer) 

一、KafkaBrokerInfo:自定義bean類,主要功能儲存連線kafka的broker的元資料,比如host&port;程式碼如下:

/**
 * Kafka伺服器連線引數
 * Created by gerry on 12/21.
 */
public class KafkaBrokerInfo {
    // 主機名
    public final String brokerHost;
    // 埠號
    public final int brokerPort;

    /**
     * 構造方法
     *
     * @param brokerHost Kafka伺服器主機或者IP地址
     * @param brokerPort 埠號
     */
    public KafkaBrokerInfo(String brokerHost, int brokerPort) {
        this.brokerHost = brokerHost;
        this.brokerPort = brokerPort;
    }

    /**
     * 構造方法, 使用預設埠號9092進行構造
     *
     * @param brokerHost
     */
    public KafkaBrokerInfo(String brokerHost) {
        this(brokerHost, 9092);
    }
}

二、KafkaTopicPartitionInfo:自定義bean類,主要功能是儲存讀取具體分割槽的資訊,包括topic名稱和partition ID;程式碼如下:

/**
 * Created by gerry on 02/22.
 */
public class KafkaTopicPartitionInfo {
    // 主題名稱
    public final String topic;
    // 分割槽id
    public final int partitionID;

    /**
     * 建構函式
     *
     * @param topic       主題名稱
     * @param partitionID 分割槽id
     */
    public KafkaTopicPartitionInfo(String topic, int partitionID) {
        this.topic = topic;
        this.partitionID = partitionID;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        KafkaTopicPartitionInfo that = (KafkaTopicPartitionInfo) o;

        if (partitionID != that.partitionID) return false;
        return topic != null ? topic.equals(that.topic) : that.topic == null;

    }

    @Override
    public int hashCode() {
        int result = topic != null ? topic.hashCode() : 0;
        result = 31 * result + partitionID;
        return result;
    }
}

三、JavaKafkaSimpleConsumerAPI:具體通過Kafka提供的LowerAPI操作Kafka的相關程式碼,包括資料的讀取、偏移量的讀取、更新等操作;具體程式碼如下:

import kafka.api.*;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;

/**
 * TODO: 新增必要的日誌列印資訊
 * Kafka Lower consumer api ==> Kafka Simple Consumer API
 * Created by gerry on 12/21.
 */
public class JavaKafkaSimpleConsumerAPI {
    // 最大重試次數
    private int maxRetryTimes = 5;
    // 重試間隔時間
    private long retryIntervalMillis = 1000;
    // 快取Topic/Partition對應的Broker連線資訊
    private Map<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>> replicaBrokers = new HashMap<KafkaTopicPartitionInfo, List<KafkaBrokerInfo>>();

    /**
     * 執行入口
     *
     * @param maxReads           最多讀取記錄數量
     * @param topicPartitionInfo 讀取資料的topic分割槽資訊
     * @param seedBrokers        連線topic分割槽的初始化連線資訊
     * @throws Exception
     */
    public void run(long maxReads,
                    KafkaTopicPartitionInfo topicPartitionInfo,
                    List<KafkaBrokerInfo> seedBrokers) throws Exception {
        // 預設消費資料的偏移量是當前分割槽的最早偏移量值
        long whichTime = kafka.api.OffsetRequest.EarliestTime();

        // 構建client name及groupId
        String topic = topicPartitionInfo.topic;
        int partitionID = topicPartitionInfo.partitionID;
        String clientName = this.createClientName(topic, partitionID);
        String groupId = clientName;

        // 獲取當前topic分割槽對應的分割槽元資料(主要包括leader節點的連線資訊)
        PartitionMetadata metadata = this.findLeader(seedBrokers, topic, partitionID);

        // 校驗元資料
        this.validatePartitionMetadata(metadata);


        // 連線leader節點構建具體的SimpleConsumer物件
        SimpleConsumer consumer = this.createSimpleConsumer(metadata.leader().host(),
                metadata.leader().port(), clientName);

        try {
            // 獲取當前topic、當前consumer的消費資料offset偏移量
            int times = 0;
            long readOffSet = -1;
            while (true) {
                readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, whichTime, clientName);
                if (readOffSet == -1) {
                    // 當返回為-1的時候,表示異常資訊
                    if (times > this.maxRetryTimes) {
                        throw new RuntimeException("Fetch the last offset of those group:" + groupId + " occur exception");
                    }
                    // 先休眠,再重新構建Consumer連線
                    times++;
                    this.sleep();
                    consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
                    continue;
                }

                // 正常情況下,結束迴圈
                break;
            }
            System.out.println("The first read offset is:" + readOffSet);

            int numErrors = 0;
            boolean ever = maxReads <= 0;
            // 開始資料讀取操作迴圈,當maxReads為非正數的時候,一直讀取資料;當maxReads為正數的時候,最多讀取maxReads條資料
            while (ever || maxReads > 0) {
                // 構建獲取資料的請求物件, 給定獲取資料對應的topic、partition、offset以及每次獲取資料最多獲取條數
                kafka.api.FetchRequest request = new FetchRequestBuilder()
                        .clientId(clientName)
                        .addFetch(topic, partitionID, readOffSet, 100000)
                        .build();

                // 傳送請求到Kafka,並獲得返回值
                FetchResponse response = consumer.fetch(request);

                // 如果返回物件表示存在異常,進行異常處理,並進行consumer重新連線的操作
                // 當異常連續出現次數超過5次的時候,程式丟擲異常
                if (response.hasError()) {
                    String leaderBrokerHost = consumer.host();
                    numErrors++;
                    short code = response.errorCode(topic, partitionID);
                    System.out.println("Error fetching data from the Broker:" + leaderBrokerHost + " Reason:" + code);
                    if (numErrors > 5) break;
                    if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                        // 異常表示是offset異常,重新獲取偏移量即可
                        readOffSet = this.getLastOffSet(consumer, groupId, topic, partitionID, kafka.api.OffsetRequest.LatestTime(), clientName);
                        continue;
                    }
                    consumer.close();
                    consumer = null;

                    // 重新建立一個SimpleConsumer物件
                    consumer = this.createNewSimpleConsumer(consumer, topic, partitionID);
                    continue;
                }
                // 重置失敗次數
                numErrors = 0;

                // 接收資料沒有異常,那麼開始對資料進行具體操作,eg: 列印
                long numRead = 0;
                for (MessageAndOffset messageAndOffset : response.messageSet(topic, partitionID)) {
                    // 校驗偏移量
                    long currentOffset = messageAndOffset.offset();
                    if (currentOffset < readOffSet) {
                        System.out.println("Found and old offset:" + currentOffset + " Expection:" + readOffSet);
                        continue;
                    }

                    // 獲取下一個讀取資料開始的偏移量
                    readOffSet = messageAndOffset.nextOffset();

                    // 讀取資料的value
                    ByteBuffer payload = messageAndOffset.message().payload();

                    byte[] bytes = new byte[payload.limit()];
                    payload.get(bytes);
                    System.out.println(currentOffset + ": " + new String(bytes, "UTF-8"));
                    numRead++;
                    maxReads--;
                }

                // 更新偏移量
                consumer = this.updateOffset(consumer, topic, partitionID,
                        readOffSet, groupId, clientName, 0);

                // 如果沒有讀取資料,休眠一秒鐘
                if (numRead == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        // nothings
                    }
                }
            }

            System.out.println("執行完成....");
        } finally {
            // 關閉資源
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (Exception e) {
                    // nothings
                }
            }
        }
    }

    /**
     * 驗證分割槽元資料,如果驗證失敗,直接丟擲IllegalArgumentException異常
     *
     * @param metadata
     */
    private void validatePartitionMetadata(PartitionMetadata metadata) {
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting!!");
            throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting!!");
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting!!");
            throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting!!");
        }
    }

    /**
     * Finding the Lead Broker for a Topic and Partition<br/>
     * 獲取主題和分割槽對應的主Broker節點(即topic和分割槽id是給定引數的對應brokere節點的元資料)<br/>
     * 獲取方式:
     *
     * @param brokers     Kafka叢集連線引數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
     * @param topic       topic名稱
     * @param partitionID 分割槽id
     * @return
     */
    public PartitionMetadata findLeader(
            List<KafkaBrokerInfo> brokers,
            String topic,
            int partitionID) {
        PartitionMetadata returnMetadata = null;

        for (KafkaBrokerInfo broker : brokers) {
            SimpleConsumer consumer = null;

            try {
                // 1. 建立簡單的消費者連線物件
                consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, 100000, 64 * 1024, "leaderLookUp");

                // 2. 構建獲取引數的Topic名稱引數集合
                List<String> topics = Collections.singletonList(topic);

                // 3. 構建請求引數
                TopicMetadataRequest request = new TopicMetadataRequest(topics);

                // 4. 請求資料,得到返回物件
                TopicMetadataResponse response = consumer.send(request);

                // 5. 獲取返回值
                List<TopicMetadata> metadatas = response.topicsMetadata();

                // 6. 遍歷返回值
                for (TopicMetadata metadata : metadatas) {
                    // 獲取當前metadata對應的分割槽
                    String currentTopic = metadata.topic();
                    if (topic.equalsIgnoreCase(currentTopic)) {
                        // 遍歷所有分割槽的原始資料 ==> 當前分割槽的元資料
                        for (PartitionMetadata part : metadata.partitionsMetadata()) {
                            if (part.partitionId() == partitionID) {
                                // 1. 找到對應的元資料
                                returnMetadata = part;

                                // 2. 更新備份節點的host資料
                                if (returnMetadata != null) {
                                    KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
                                    List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
                                    if (brokerInfos == null) {
                                        brokerInfos = new ArrayList<KafkaBrokerInfo>();
                                    } else {
                                        brokerInfos.clear();
                                    }

                                    for (Broker replica : returnMetadata.replicas()) {
                                        brokerInfos.add(new KafkaBrokerInfo(replica.host(), replica.port()));
                                    }

                                    this.replicaBrokers.put(topicPartitionInfo, brokerInfos);
                                    return returnMetadata;
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + broker.brokerHost + "] to find Leader for [" + topic + ", " + partitionID + "] Reason:" + e);
            } finally {
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (Exception e) {
                        // nothings
                    }
                }
            }
        }

        // 沒有找到,返回一個空值,預設情況下,不會返回該值
        return null;
    }

    /**
     * 獲取當前groupID對應的consumer在對應的topic和partition中對應的offset偏移量
     *
     * @param consumer    消費者
     * @param groupId     消費者分割槽id
     * @param topic       所屬的Topic
     * @param partitionID 所屬的分割槽ID
     * @param whichTime   用於判斷,當consumer從沒有消費資料的時候,從當前topic的Partition的那個offset開始讀取資料
     * @param clientName  client名稱
     * @return 正常情況下,返回非負數,當出現異常的時候,返回-1
     */
    public long getLastOffSet(SimpleConsumer consumer, String groupId,
                              String topic, int partitionID,
                              long whichTime, String clientName) {
        // 1. 從ZK中獲取偏移量,當zk的返回偏移量大於0的時候,表示是一個正常的偏移量
        long offset = this.getOffsetOfTopicAndPartition(consumer, groupId, clientName, topic, partitionID);
        if (offset > 0) {
            return offset;
        }

        // 2. 獲取當前topic當前分割槽的資料偏移量
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

        OffsetRequest request = new OffsetRequest(requestInfoMap, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionID));
            return -1;
        }

        // 獲取偏移量
        long[] offsets = response.offsets(topic, partitionID);
        return offsets[0];
    }

    /**
     * 從儲存consumer消費者offset偏移量的位置獲取當前consumer對應的偏移量
     *
     * @param consumer    消費者
     * @param groupId     Group Id
     * @param clientName  client名稱
     * @param topic       topic名稱
     * @param partitionID 分割槽id
     * @return
     */
    public long getOffsetOfTopicAndPartition(SimpleConsumer consumer, String groupId, String clientName, String topic, int partitionID) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        List<TopicAndPartition> requestInfo = new ArrayList<TopicAndPartition>();
        requestInfo.add(topicAndPartition);
        OffsetFetchRequest request = new OffsetFetchRequest(groupId, requestInfo, 0, clientName);
        OffsetFetchResponse response = consumer.fetchOffsets(request);

        // 獲取返回值
        Map<TopicAndPartition, OffsetMetadataAndError> returnOffsetMetadata = response.offsets();
        // 處理返回值
        if (returnOffsetMetadata != null && !returnOffsetMetadata.isEmpty()) {
            // 獲取當前分割槽對應的偏移量資訊
            OffsetMetadataAndError offset = returnOffsetMetadata.get(topicAndPartition);
            if (offset.error() == ErrorMapping.NoError()) {
                // 沒有異常,表示是正常的,獲取偏移量
                return offset.offset();
            } else {
                // 當Consumer第一次連線的時候(zk中不在當前topic對應資料的時候),會產生UnknownTopicOrPartitionCode異常
                System.out.println("Error fetching data Offset Data the Topic and Partition. Reason: " + offset.error());
            }
        }

        // 所有異常情況直接返回0
        return 0;
    }

    /**
     * 根據給定引數獲取一個新leader的分割槽元資料資訊
     *
     * @param oldLeader
     * @param topic
     * @param partitionID
     * @return
     */
    private PartitionMetadata findNewLeaderMetadata(String oldLeader,
                                                    String topic,
                                                    int partitionID) {
        KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
        List<KafkaBrokerInfo> brokerInfos = this.replicaBrokers.get(topicPartitionInfo);
        for (int i = 0; i < 3; i++) {
            boolean gotoSleep = false;
            PartitionMetadata metadata = this.findLeader(brokerInfos, topic, partitionID);
            if (metadata == null) {
                gotoSleep = true;
            } else if (metadata.leader() == null) {
                gotoSleep = true;
            } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // leader切換過程中
                gotoSleep = true;
            } else {
                return metadata;
            }

            if (gotoSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // nothings
                }
            }
        }

        System.out.println("Unable to find new leader after Broker failure. Exiting!!");
        throw new RuntimeException("Unable to find new leader after Broker failure. Exiting!!");
    }

    /**
     * 更新偏移量,當SimpleConsumer發生變化的時候,重新構造一個新的SimpleConsumer並返回
     *
     * @param consumer
     * @param topic
     * @param partitionID
     * @param readOffSet
     * @param groupId
     * @param clientName
     * @param times
     * @return
     * @throws RuntimeException 當更新失敗的情況下
     */
    private SimpleConsumer updateOffset(SimpleConsumer consumer, String topic, int partitionID, long readOffSet, String groupId, String clientName, int times) {
        // 構建請求物件
        Map<TopicAndPartition, OffsetAndMetadata> requestInfoMap = new HashMap<TopicAndPartition, OffsetAndMetadata>();
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionID);
        requestInfoMap.put(topicAndPartition, new OffsetAndMetadata(readOffSet, OffsetAndMetadata.NoMetadata(), -1));
        kafka.javaapi.OffsetCommitRequest ocRequest = new OffsetCommitRequest(groupId, requestInfoMap, 0, clientName);
        // 提交修改偏移量的請求,並獲取返回值
        kafka.javaapi.OffsetCommitResponse response = consumer.commitOffsets(ocRequest);

        // 根據返回值進行不同的操作
        if (response.hasError()) {
            short code = response.errorCode(topicAndPartition);
            if (times > this.maxRetryTimes) {
                throw new RuntimeException("Update the Offset occur exception," +
                        " the current response code is:" + code);
            }

            if (code == ErrorMapping.LeaderNotAvailableCode()) {
                // 當異常code為leader切換情況的時候,重新構建consumer物件
                // 操作步驟:先休眠一段時間,再重新構造consumer物件,最後重試
                try {
                    Thread.sleep(this.retryIntervalMillis);
                } catch (InterruptedException e) {
                    // nothings
                }
                PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
                        topic, partitionID);
                this.validatePartitionMetadata(metadata);
                consumer = this.createSimpleConsumer(metadata.leader().host(),
                        metadata.leader().port(), clientName);
                // 重試
                consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
            }

            if (code == ErrorMapping.RequestTimedOutCode()) {
                // 當異常為請求超時的時候,進行重新請求
                consumer = updateOffset(consumer, topic, partitionID, readOffSet, groupId, clientName, times + 1);
            }

            // 其他code直接丟擲異常
            throw new RuntimeException("Update the Offset occur exception," +
                    " the current response code is:" + code);
        }

        // 返回修改後的consumer物件
        return consumer;
    }

    /**
     * 構建clientName根據主題名稱和分割槽id
     *
     * @param topic
     * @param partitionID
     * @return
     */
    private String createClientName(String topic, int partitionID) {
        return "client_" + topic + "_" + partitionID;
    }

    /**
     * 根據一個老的consumer,重新建立一個consumer物件
     *
     * @param consumer
     * @param topic
     * @param partitionID
     * @return
     */
    private SimpleConsumer createNewSimpleConsumer(SimpleConsumer consumer, String topic, int partitionID) {
        // 重新獲取新的leader節點
        PartitionMetadata metadata = this.findNewLeaderMetadata(consumer.host(),
                topic, partitionID);
        // 校驗元資料
        this.validatePartitionMetadata(metadata);
        // 重新建立consumer的連線
        return this.createSimpleConsumer(metadata.leader().host(),
                metadata.leader().port(), consumer.clientId());
    }

    /**
     * 構建一個SimpleConsumer並返回
     *
     * @param host
     * @param port
     * @param clientName
     * @return
     */
    private SimpleConsumer createSimpleConsumer(String host, int port, String clientName) {
        return new SimpleConsumer(host, port, 100000, 64 * 1024, clientName);
    }

    /**
     * 休眠一段時間
     */
    private void sleep() {
        try {
            Thread.sleep(this.maxRetryTimes);
        } catch (InterruptedException e) {
            // nothings
        }
    }

    /**
     * 關閉對應資源
     *
     * @param consumer
     */
    private static void closeSimpleConsumer(SimpleConsumer consumer) {
        if (consumer != null) {
            try {
                consumer.close();
            } catch (Exception e) {
                // nothings
            }
        }
    }

    /**
     * 從Kafka叢集中獲取指定topic的分割槽ID<br/>
     * 如果叢集中不存在對應的topic,那麼返回一個empty的集合
     *
     * @param brokers    Kafka叢集連線引數,eg: {"hadoop-senior01" -> 9092, "hadoop-senior02" -> 9092}
     * @param topic      要獲取ID對應的主題
     * @param soTimeout  過期時間
     * @param bufferSize 緩衝區大小
     * @param clientId   client連線ID
     * @return
     */
    public static List<Integer> fetchTopicPartitionIDs(List<KafkaBrokerInfo> brokers, String topic, int soTimeout, int bufferSize, String clientId) {
        Set<Integer> partitionIDs = new HashSet<Integer>();

        List<String> topics = Collections.singletonList(topic);

        // 連線所有的Kafka伺服器,然後獲取引數 ==> 遍歷連線
        for (KafkaBrokerInfo broker : brokers) {
            SimpleConsumer consumer = null;

            try {
                // 構建簡單消費者連線物件
                consumer = new SimpleConsumer(broker.brokerHost, broker.brokerPort, soTimeout, bufferSize, clientId);

                // 構建請求引數
                TopicMetadataRequest tmRequest = new TopicMetadataRequest(topics);

                // 傳送請求
                TopicMetadataResponse response = consumer.send(tmRequest);

                // 獲取返回結果
                List<TopicMetadata> metadatas = response.topicsMetadata();

                // 遍歷返回結果,獲取對應topic的結果值
                for (TopicMetadata metadata : metadatas) {
                    if (metadata.errorCode() == ErrorMapping.NoError()) {
                        // 沒有異常的情況下才進行處理
                        if (topic.equals(metadata.topic())) {
                            // 處理當前topic對應的分割槽
                            for (PartitionMetadata part : metadata.partitionsMetadata()) {
                                partitionIDs.add(part.partitionId());
                            }
                            // 處理完成,結束迴圈
                            break;
                        }
                    }
                }
            } finally {
                // 關閉連線
                closeSimpleConsumer(consumer);
            }
        }

        // 返回結果
        return new ArrayList<Integer>(partitionIDs);
    }


}

四、JavaKafkaSimpleConsumerAPITest:測試類;主要程式碼如下:

import java.util.ArrayList;
import java.util.List;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaSimpleConsumerAPITest {
    public static void main(String[] args) {
        JavaKafkaSimpleConsumerAPI example = new JavaKafkaSimpleConsumerAPI();
        long maxReads = 300;
        String topic = "test2";
        int partitionID = 0;

        KafkaTopicPartitionInfo topicPartitionInfo = new KafkaTopicPartitionInfo(topic, partitionID);
        List<KafkaBrokerInfo> seeds = new ArrayList<KafkaBrokerInfo>();
        seeds.add(new KafkaBrokerInfo("192.168.187.146", 9092));

        try {
            example.run(maxReads, topicPartitionInfo, seeds);
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 獲取該topic所屬的所有分割槽ID列表
        System.out.println(example.fetchTopicPartitionIDs(seeds, topic, 100000, 64 * 1024, "client-id"));
    }
}

五、測試

六、Kafka Pom檔案依賴

<properties>
    <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>