kafka low-level consumer詳解
kafka
Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,作為大資料系統中重要的一環,目前最新版本為kafka_2.11-0.10.2.0;在0.9.0版本後就統一了consumer api,不在區分high-level和low-level,但是在很多公司還是用的老版本的api,所以今天還是重新看看low-level api
kafka SimpleConsumer 消費資料流程圖
topic邏輯結構圖
根據topic邏輯結構圖,我們大概來說下具體的消費流程
- 通常情況下,一個topic都會有多個partition,這樣可以增加topic的message吞吐量;同時每個parition都會有多個副本集replication(例如repl1-1,repl1-2,repl1-3),這樣可以提高資料的可用性,不會因為部分節點的宕機而資料丟失
- 一個partition有多個replication,在資料讀寫的時候,資料會先請求主節點或者是活躍節點,然後其他的幾個slave節點通過複製的方式同步資料,所以從partition上讀取資料時,首先要獲得這個partition的主節點(流程圖中的步驟1,2)
- 獲得partition的主節點後,然後就是要從什麼位置開始讀取資料,也就是從哪個offset開始讀資料,通常可以指定為如下兩種(當然也可以從指定的具體的offset,這就需要維護kafka的消費的偏移量,可以存在zookeeper,hdfs或者db)(步驟3,4)
- smallest : 從最小偏移量
- largest : 從當前最大的偏移量
- getMessage(步驟5,6)
程式碼
在生產環境中,消費topic中的資料的完整步驟是:
1. 獲得所有的parition
2. 然後獲得遍歷每個partition,找到每個partition的leader
3. 然後消費資料;
這裡我們以消費parition0中的資料為例,跳過第一個步驟
獲得partition0的leader
建構函式
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int ,
val bufferSize: Int,
val clientId: String)
import kafka.javaapi.TopicMetadataRequest;
def this(topics: java.util.List[String]) =
this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
獲得leader
String host = "192.168.1.115";
int port = 9092;
int soTimeout = 1000*6;
int bufferSize = 512 * 1024;
String clientId = "clientId";
String topic = "test";
int paritionid = 0;
//構造consumer,這裡的host是brokers,在現實環境中broker有很多個,
// 遍歷所有的broker,並獲得該broker上的partition,然後獲得partition的leader
SimpleConsumer simpleConsumer
= new SimpleConsumer(host,port,soTimeout,bufferSize,clientId);
List<String> topics = Collections.singletonList(topic);
//構造TopicMetadataRequest
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);
String leader = "";
TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);
List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
for(TopicMetadata topicMetadata:topicMetadatas){
List<PartitionMetadata> partitionMetadatas = topicMetadata.partitionsMetadata();
for(PartitionMetadata partitionMetadata:partitionMetadatas){
if(partitionMetadata.partitionId()==paritionid){
leader = partitionMetadata.leader().host();
}
}
}
System.out.println(leader);
結果
192.168.1.115
獲得offset
kafka.javaapi.OffsetRequest
kafka.common.TopicAndPartition
kafka.api.PartitionOffsetRequestInfo
class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
versionId: Short,
clientId: String)
kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()
程式碼
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,paritionid);
long whichTime = kafka.api.OffsetRequest.LatestTime();
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(whichTime,1);
Map<TopicAndPartition,PartitionOffsetRequestInfo> infoMap
= new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
infoMap.put(topicAndPartition,partitionOffsetRequestInfo);
kafka.javaapi.OffsetRequest offsetRequest
= new kafka.javaapi.OffsetRequest(infoMap,kafka.api.OffsetRequest.CurrentVersion(), clientId);
OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(offsetRequest);
long readFromOffset = offsetsBefore.offsets(topic,paritionid)[0];
System.out.println(readFromOffset);
偏移量
69690
讀取資料
這裡我們從offset=0的位置開始讀資料(kafka.api.OffsetRequest.EarliestTime()),這裡的readFromOffset=0
int fetchSize = 10000;
kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
.clientId(clientId)
.addFetch(topic,paritionid,readFromOffset,fetchSize)
.build();
FetchResponse fetch = simpleConsumer.fetch(fetchRequest);
ByteBufferMessageSet messageAndOffsets = fetch.messageSet(topic,paritionid);
Iterator<MessageAndOffset> iterator = messageAndOffsets.iterator();
while (iterator.hasNext()){
MessageAndOffset messageAndOffset = iterator.next();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println("OFFSET:"+String.valueOf(messageAndOffset.offset()) + ",MESSAGE: " + new String(bytes));
}
OFFSET:0,MESSAGE: ssssssssssssssssssssss
OFFSET:1,MESSAGE: dddddddddddddddddddddddddddd
OFFSET:2,MESSAGE: 0
OFFSET:3,MESSAGE: 1
OFFSET:4,MESSAGE: 2
OFFSET:5,MESSAGE: 3
OFFSET:6,MESSAGE: 4
OFFSET:7,MESSAGE: 5
OFFSET:8,MESSAGE: 6
OFFSET:9,MESSAGE: 7
OFFSET:10,MESSAGE: 8
OFFSET:11,MESSAGE: 9
注意
這裡程式碼沒有進行錯誤處理,例如,leader切換,OffsetOutOfRangeCode等問題,這些程式碼只是為了瞭解low-level消費資料的流程,完整的low-level消費程式碼如下
package com.fan.kafka;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.*;
public class SimpleConsumerAPI {
public static void main(String args[]) throws Exception {
SimpleConsumerAPI example = new SimpleConsumerAPI();
long maxReads = Long.parseLong(args[0]);
String topic = args[1];
int partition = Integer.parseInt(args[2]);
List<String> seeds = new ArrayList<String>();
seeds.add(args[3]);
int port = Integer.parseInt(args[4]);
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}
private List<String> m_replicaBrokers = new ArrayList<String>();
public SimpleConsumerAPI() {
m_replicaBrokers = new ArrayList<String>();
}
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
//
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out.println("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
readOffset = 26000;
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
if (numErrors > 5) break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null) consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop:
for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null) consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
return returnMetaData;
}
}
相關推薦
kafka low-level consumer詳解
kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,作為大資料系統中重要的一環,目前最新版本為kafka_2.11-0.10.2.0;在0.9.0版本後就統一了consumer api,不在區分high-level和low-level,但
Kafka low level API8 Consumer
Downsides of using SimpleConsumer The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: You must keep track of t
kafka參數配置詳解
kafka 參數 broker.idbroker的唯一標識符,如果不配置則自動生成,建議配置且一定要保證集群中必須唯一,默認-1log.dir日誌數據存放的目錄,默認/tmp/kafka-logslog.dirs日誌數據存放的目錄,如果沒有配置則使用log.dir,建議此項配置。zookeeper.c
kafka配置參數詳解
iss 浪費 又是 inter ogr string second 最大消息 lse Broker Configs Property Default Description broker.id 每個broker都可以用一個唯一的非負整數id進行標識;
Kafka:無丟失提取kafka的值,詳解kafka的消費過程
目錄: 1、需求 2、程式碼步鄹 3、程式碼展現 4、pom.xml檔案 5、結果展現 ——————————————————————————————————– 1、需求 前提:將org.apache.spark.streaming.kafka.KafkaCluster這個類抽出來變成Kafka
Kafka元件connector使用詳解
1、檔案準備: 應用程式=>kafka.connector.hbase.jar 日誌配置=>connect-log4j.properties Work啟動配置=>connect-distributed.properties 2、檔案 日誌配置
RocketMQ詳解(10)——Consumer詳解
RocketMQ詳解(10)——消費模式詳解 一. 不同型別的消費者 根據使用者對讀取操作的控制情況,消費在可以分為兩種型別: DefaultMQPushConsumer:有系統控制讀取操作,收到訊息後自動呼叫監聽器回撥處理。 DefaultMQPu
springboot配置kafka生產者和消費者詳解
在原有pom.xml依賴下新新增一下kafka依賴ar包 <!--kafka--> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka
跟我學Kafka之Controller控制器詳解
作者:小程 我們的kafka原始碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼在今後的分享中,主要集中在kafka效能優化和使用。 Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partit
Kafka 設計與原理詳解
一、Kafka簡介 本文綜合了我之前寫的kafka相關文章,可作為一個全面瞭解學習kafka的培訓學習資料。 轉載請註明出處 : 本文連結 1.1 背景歷史 當今社會各種應用系統諸如商業、社交、搜尋、瀏覽等像資訊工廠一樣不斷的生產出各種資訊,在大資料時
【轉】kafka-檔案儲存機制詳解
文章轉自“美團技術部落格”:https://tech.meituan.com/ Kafka是什麼 Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web
Centos kafka單機配置部署詳解
Centos下kafka 單機配置部署詳解 前提: Linux版本:CentOS release 6.6 (Final) JDK版本:jdk-7u71-linux-x64.tar.gz zookeeper版本:zookeeper-3.3.6.tar.gz
跟我學Kafka之Controller控制器詳解(一)
我們的kafka原始碼分享已經進行過很多期了,主要的內容也都分享的差不多了,那麼那麼在今後的分享中,主要集中在kafka效能優化和使用 Kafka叢集中的其中一個Broker會被選舉為Controller,主要負責Partition管理和副本狀態管理,也會執行類似於重分配Partition之類的管理任務
kafka 配置檔案引數詳解
kafka的配置分為 broker、producter、consumer三個不同的配置 一 BROKER 的全域性配置 最為核心的三個配置 broker.id、log.dir、zookeeper.connect 。 ------------------------
Kafka實戰寶典:Kafka的控制器controller詳解
一、控制器簡介 控制器元件(Controller),是 Apache Kafka 的核心元件。它的主要作用是在 Apache ZooKeeper 的幫助下管理和協調整個 Kafka 叢集。叢集中任意一臺 Broker 都能充當控制器的角色,但是,在執行過程中,只能有一個 Broker 成為控制器,行使其管理和
kafka consumer 配置詳解
1、Consumer Group 與 topic 訂閱 每個Consumer 程序都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。所以一條message可以被多個訂閱message 所在的topic的每一個Consume
kafka consumer消費者 offset groupID詳解
groupID:一個字串用來指示一組consumer所在的組。相同的groupID表示在一個組裡。相同的groupID消費記錄offset時,記錄的是同一個offset。所以,此處需要注意,(1)如果多個地方都使用相同的groupid,可能造成個別消費者消費不到的情況(2)如果單個消費者消費能力不足的
Kafka單執行緒Consumer及引數詳解
請使用0.9以後的版本: 示例程式碼 Properties props = new Properties(); props.put("bootstrap.servers", "kafka01:9092,kafka02:9092"); props.put("group.id",
log4j實時將數據寫入到kafka,Demo和相關的配置詳解
producer ceshi class ogg slf4 lte std att mage 一:在項目中引入對應的JAR包,如下,註意對應的包與之前包的沖突 <dependencies> <dependency> <group