1. 程式人生 > >kafka low-level consumer詳解

kafka low-level consumer詳解

kafka

Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,作為大資料系統中重要的一環,目前最新版本為kafka_2.11-0.10.2.0;在0.9.0版本後就統一了consumer api,不在區分high-level和low-level,但是在很多公司還是用的老版本的api,所以今天還是重新看看low-level api

kafka SimpleConsumer 消費資料流程圖

image

topic邏輯結構圖

image

根據topic邏輯結構圖,我們大概來說下具體的消費流程

  • 通常情況下,一個topic都會有多個partition,這樣可以增加topic的message吞吐量;同時每個parition都會有多個副本集replication(例如repl1-1,repl1-2,repl1-3),這樣可以提高資料的可用性,不會因為部分節點的宕機而資料丟失
  • 一個partition有多個replication,在資料讀寫的時候,資料會先請求主節點或者是活躍節點,然後其他的幾個slave節點通過複製的方式同步資料,所以從partition上讀取資料時,首先要獲得這個partition的主節點(流程圖中的步驟1,2)
  • 獲得partition的主節點後,然後就是要從什麼位置開始讀取資料,也就是從哪個offset開始讀資料,通常可以指定為如下兩種(當然也可以從指定的具體的offset,這就需要維護kafka的消費的偏移量,可以存在zookeeper,hdfs或者db)(步驟3,4)
    1. smallest : 從最小偏移量
    2. largest : 從當前最大的偏移量
  • getMessage(步驟5,6)

程式碼

在生產環境中,消費topic中的資料的完整步驟是:
1. 獲得所有的parition
2. 然後獲得遍歷每個partition,找到每個partition的leader
3. 然後消費資料;

這裡我們以消費parition0中的資料為例,跳過第一個步驟

獲得partition0的leader

建構函式

class SimpleConsumer(val host: String,
                     val port: Int,
                     val soTimeout: Int
, val bufferSize: Int, val clientId: String)
import kafka.javaapi.TopicMetadataRequest; def this(topics: java.util.List[String]) = this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

獲得leader

String host = "192.168.1.115";
int port = 9092;
int soTimeout = 1000*6;
int bufferSize = 512 * 1024;
String clientId = "clientId";
String topic = "test";
int paritionid = 0;

//構造consumer,這裡的host是brokers,在現實環境中broker有很多個,
// 遍歷所有的broker,並獲得該broker上的partition,然後獲得partition的leader
SimpleConsumer simpleConsumer
        = new SimpleConsumer(host,port,soTimeout,bufferSize,clientId);

List<String> topics = Collections.singletonList(topic);
//構造TopicMetadataRequest
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);

String leader = "";

TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
for(TopicMetadata topicMetadata:topicMetadatas){
    List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
    for(PartitionMetadata partitionMetadata:partitionMetadatas){
        if(partitionMetadata.partitionId()==paritionid){
            leader = partitionMetadata.leader().host();
        }
    }
}

System.out.println(leader);

結果

192.168.1.115

獲得offset

kafka.javaapi.OffsetRequest
kafka.common.TopicAndPartition
kafka.api.PartitionOffsetRequestInfo

class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
                    versionId: Short,
                    clientId: String)

kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()

程式碼

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,paritionid);
long whichTime = kafka.api.OffsetRequest.LatestTime();

PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);
Map<TopicAndPartition,PartitionOffsetRequestInfo> infoMap
        = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
infoMap.put(topicAndPartition,partitionOffsetRequestInfo);

kafka.javaapi.OffsetRequest offsetRequest
        = new kafka.javaapi.OffsetRequest(infoMap,kafka.api.OffsetRequest.CurrentVersion(), clientId);

OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(offsetRequest);

long readFromOffset = offsetsBefore.offsets(topic,paritionid)[0];
System.out.println(readFromOffset);

偏移量

69690

讀取資料

這裡我們從offset=0的位置開始讀資料(kafka.api.OffsetRequest.EarliestTime()),這裡的readFromOffset=0

int fetchSize = 10000;

kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
        .clientId(clientId)
        .addFetch(topic,paritionid,readFromOffset,fetchSize)
        .build();

FetchResponse fetch = simpleConsumer.fetch(fetchRequest);
ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic,paritionid);

Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
while (iterator.hasNext()){
    MessageAndOffset messageAndOffset = iterator.next();
    ByteBuffer payload = messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println("OFFSET:"+String.valueOf(messageAndOffset.offset()) + ",MESSAGE: " + new String(bytes));
}


OFFSET:0,MESSAGE: ssssssssssssssssssssss
OFFSET:1,MESSAGE: dddddddddddddddddddddddddddd
OFFSET:2,MESSAGE: 0
OFFSET:3,MESSAGE: 1
OFFSET:4,MESSAGE: 2
OFFSET:5,MESSAGE: 3
OFFSET:6,MESSAGE: 4
OFFSET:7,MESSAGE: 5
OFFSET:8,MESSAGE: 6
OFFSET:9,MESSAGE: 7
OFFSET:10,MESSAGE: 8
OFFSET:11,MESSAGE: 9

注意

這裡程式碼沒有進行錯誤處理,例如,leader切換,OffsetOutOfRangeCode等問題,這些程式碼只是為了瞭解low-level消費資料的流程,完整的low-level消費程式碼如下

package com.fan.kafka;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;


public class SimpleConsumerAPI {

    public static void main(String args[]) throws Exception {
        SimpleConsumerAPI example = new SimpleConsumerAPI();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleConsumerAPI() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
        readOffset = 26000;
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

相關推薦

kafka low-level consumer

kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,作為大資料系統中重要的一環,目前最新版本為kafka_2.11-0.10.2.0;在0.9.0版本後就統一了consumer api,不在區分high-level和low-level,但

Kafka low level API8 Consumer

Downsides of using SimpleConsumer The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: You must keep track of t

kafka參數配置

kafka 參數 broker.idbroker的唯一標識符,如果不配置則自動生成,建議配置且一定要保證集群中必須唯一,默認-1log.dir日誌數據存放的目錄,默認/tmp/kafka-logslog.dirs日誌數據存放的目錄,如果沒有配置則使用log.dir,建議此項配置。zookeeper.c

kafka配置參數

iss 浪費 又是 inter ogr string second 最大消息 lse Broker Configs Property Default Description broker.id 每個broker都可以用一個唯一的非負整數id進行標識;

Kafka:無丟失提取kafka的值,kafka的消費過程

目錄: 1、需求 2、程式碼步鄹 3、程式碼展現 4、pom.xml檔案 5、結果展現 ——————————————————————————————————– 1、需求 前提:將org.apache.spark.streaming.kafka.KafkaCluster這個類抽出來變成Kafka

Kafka元件connector使用

1、檔案準備: 應用程式=>kafka.connector.hbase.jar 日誌配置=>connect-log4j.properties Work啟動配置=>connect-distributed.properties 2、檔案  日誌配置

RocketMQ(10)——Consumer

RocketMQ詳解(10)——消費模式詳解 一. 不同型別的消費者 根據使用者對讀取操作的控制情況,消費在可以分為兩種型別: DefaultMQPushConsumer:有系統控制讀取操作,收到訊息後自動呼叫監聽器回撥處理。 DefaultMQPu

springboot配置kafka生產者和消費者

在原有pom.xml依賴下新新增一下kafka依賴ar包 <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka

跟我學Kafka之Controller控制器

作者:小程 我們的kafka原始碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼在今後的分享中,主要集中在kafka效能優化和使用。 Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partit

Kafka 設計與原理

一、Kafka簡介 本文綜合了我之前寫的kafka相關文章,可作為一個全面瞭解學習kafka的培訓學習資料。 轉載請註明出處 : 本文連結 1.1 背景歷史 當今社會各種應用系統諸如商業、社交、搜尋、瀏覽等像資訊工廠一樣不斷的生產出各種資訊,在大資料時

【轉】kafka-檔案儲存機制

文章轉自“美團技術部落格”:https://tech.meituan.com/ Kafka是什麼 Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web

Centos kafka單機配置部署

Centos下kafka 單機配置部署詳解 前提: Linux版本:CentOS release 6.6 (Final) JDK版本:jdk-7u71-linux-x64.tar.gz zookeeper版本:zookeeper-3.3.6.tar.gz

跟我學Kafka之Controller控制器(一)

我們的kafka原始碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼那麼在今後的分享中,主要集中在kafka效能優化和使用 Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partition之類的管理任務

kafka 配置檔案引數

kafka的配置分為 broker、producter、consumer三個不同的配置 一 BROKER 的全域性配置 最為核心的三個配置 broker.id、log.dir、zookeeper.connect 。 ------------------------

Kafka實戰寶典:Kafka的控制器controller

一、控制器簡介 控制器元件(Controller),是 Apache Kafka 的核心元件。它的主要作用是在 Apache ZooKeeper 的幫助下管理和協調整個 Kafka 叢集。叢集中任意一臺 Broker 都能充當控制器的角色,但是,在執行過程中,只能有一個 Broker 成為控制器,行使其管理和

kafka consumer 配置

1、Consumer Group 與 topic 訂閱 每個Consumer 程序都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。所以一條message可以被多個訂閱message 所在的topic的每一個Consume

kafka consumer消費者 offset groupID

groupID:一個字串用來指示一組consumer所在的組。相同的groupID表示在一個組裡。相同的groupID消費記錄offset時,記錄的是同一個offset。所以,此處需要注意,(1)如果多個地方都使用相同的groupid,可能造成個別消費者消費不到的情況(2)如果單個消費者消費能力不足的

Kafka單執行緒Consumer及引數

請使用0.9以後的版本: 示例程式碼 Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id",

log4j實時將數據寫入到kafka,Demo和相關的配置

producer ceshi class ogg slf4 lte std att mage 一:在項目中引入對應的JAR包,如下,註意對應的包與之前包的沖突 <dependencies> <dependency> <group