1. 程式人生 > >kafka原始碼分析之consumer的原始碼

kafka原始碼分析之consumer的原始碼

Consumerclient

示例程式碼

Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG"localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG"DemoConsumer");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG"false");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG

"30000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG

"org.apache.kafka.common.serialization.ByteArrayDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

"org.apache.kafka.common.serialization.ByteArrayDeserializer");

consumer = new KafkaConsumer<>(props);this

.topic = topic;

consumer.subscribe(Collections.singletonList(this.topic));

//下面的傳入一個listener這個部分的註釋如果需要對partition在當前的consumer中分配或者取消分配時做一定的操作時(比如取消分配時提交offset),可以實現這個介面。

//subscribe(List<String> topicsConsumerRebalanceListener listener)

while(true) {ConsumerRecords<IntegerString> records = 

consumer.poll(1000);for (ConsumerRecord<IntegerString> record : records) {     System.out.println("Received message: (" + record.key() + ", " + record.value() 

         + ") at offset " + record.offset());

}

   consumer.commitSync()

}

生成KafkaConsumer例項

@SuppressWarnings("unchecked")private KafkaConsumer(ConsumerConfig config,Deserializer<K> keyDeserializer,Deserializer<V> valueDeserializer) {try {log.debug("Starting the Kafka consumer");

根據配置資訊,得到如下三個配置的配置值,並檢查配置的合法:

1,讀取request.timeout.ms配置項的值,預設值為40秒。用於配置請求的超時時間。

2,讀取session.timeout.ms配置項的值,預設值為30秒,用於配置當前的consumer的session的超時時間,也就是client端多長時間不給server傳送心跳就表示這個client端超時。

3,讀取fetch.max.wait.ms配置項的值,預設值為500ms。用於配置從server中讀取資料最長的等待時間。this.requestTimeoutMs = config.getInt(

                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);int sessionTimeOutMs = config.getInt(

                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);int fetchMaxWaitMs = config.getInt(

                ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);

如果請求超時時間不是一個大於session的超時時間的值或者請求超時時間不是一個大於fetch的最大等待時間的值時,表示requestTimeoutMs的配置不合法,直接throw exception.if (this.requestTimeoutMs <= sessionTimeOutMs ||

this.requestTimeoutMs <= fetchMaxWaitMs)throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG 

" should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG 

" and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);

this.time new SystemTime();MetricConfig metricConfig = new MetricConfig().samples(

                    config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))                .timeWindow(config.getLong(

                        ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS);

這裡得到對應的consumer的client端id的client.id配置,如果這個值沒有配置時,預設隨機生成一個。clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);if (clientId.length() <= 0)clientId "consumer-" CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();

List<MetricsReporter> reporters = config.getConfiguredInstances(

                ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class);reporters.add(new JmxReporter(JMX_PREFIX));this.metrics new Metrics(metricConfigreporterstime);

讀取retry.backoff.ms配置的值,預設值為100ms,用於配置重試的間隔週期。

this.retryBackoffMs = config.getLong(

              ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

根據重試的間隔週期加上metadata.max.age.ms配置項的值生成Metadata例項,

配置metadata.max.age.ms項預設值為5分鐘,用於設定metadata定期重新讀取的生命週期。this.metadata new Metadata(retryBackoffMs

config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));

讀取bootstrap.servers配置的要讀取的kafka brokers的配置列表,並根據broker的連線資訊,生成Cluster例項,並把Cluster例項更新到metadata的例項。List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(

              config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));this.metadata.update(Cluster.bootstrap(addresses)0);

String metricGrpPrefix = "consumer";Map<StringString> metricsTags = new LinkedHashMap<StringString>();metricsTags.put("client-id"clientId);

生成NetworkClient的例項,生成例項需要如下幾個配置檔案:

1,配置項connections.max.idle.ms,預設值9分鐘,用於配置連線最大的空閒時間(每個連線的最大連線佇列為100)。

2,配置項reconnect.backoff.ms,預設值50ms,用於配置連線斷開後重新連線的間隔時間。

3,配置項send.buffer.bytes,預設值128kb,用於配置SOCKET的SO_SNDBUF傳送資料的緩衝區大小。

4,配置項receive.buffer.bytes,預設值32kb,用於配置SOCKET的SO_RCVBUF接收資料的緩衝區大小。

5,讀取request.timeout.ms配置項的值,預設值為40秒。用於配置請求的超時時間。ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(

               config.values());NetworkClient netClient = new NetworkClient(

new Selector(config.getLong(

                   ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),

metricstimemetricGrpPrefixmetricsTagschannelBuilder),this.metadata,clientId,100// a fixed large enough value will sufficeconfig.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)time);

根據retry.backoff.ms配置的值,生成一個ConsumerNetworkClient的例項。this.client new ConsumerNetworkClient(netClientmetadatatime

retryBackoffMs);

讀取auto.offset.reset配置項的值,預設值為latest。可配置("latest""earliest""none"),這個配置用於在讀取partition的offset超出範圍時,對offset進行重置的規則。OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(

                    config.getString(

                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());

生成用於管理訂閱的topic的partition的狀態管理的元件,用於管理partition的狀態與當前的offset的資訊。this.subscriptions new SubscriptionState(offsetResetStrategy);

生成用於管理相同的一個groupId下的多個client端的partition的分割槽控制,

通過partition.assignment.strategy配置,預設例項為RangeAssignorList<PartitionAssignor> assignors = config.getConfiguredInstances(                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,PartitionAssignor.class);

生成用於對consumer進行協調的例項,這個例項依賴如下配置:

1,配置項group.id,用於配置consumer對應的訂閱的組名稱,相同的組的多個client端進行協調消費處理。

2,讀取session.timeout.ms配置項的值,預設值為30秒,用於配置當前的consumer的session的超時時間,也就是client端多長時間不給server傳送心跳就表示這個client端超時。

3,配置項heartbeat.interval.ms,預設值3秒,用於定時向server傳送心跳的時間間隔。

4,根據retry.backoff.ms配置的值來設定讀取資訊失敗的重試間隔。

5,配置項enable.auto.commit,預設值true,設定是否自動提交消費過的offset的值的設定。

5,配置項auto.commit.interval.ms,預設值5秒,如果設定有自動提交offset時,自動提交的間隔時間。this.coordinator new ConsumerCoordinator(this.client,config.getString(ConsumerConfig.GROUP_ID_CONFIG),config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),assignors,this.metadata,this.subscriptions,metrics,metricGrpPrefix,metricsTags,this.time,retryBackoffMs,new ConsumerCoordinator.DefaultOffsetCommitCallback(),config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));

根據key.deserializer配置與value.deserializer配置的key,value的反序列化的配置,生成反序列化訊息的例項。這個類必須是實現Deserializer介面的類。if (keyDeserializer == null) {this.keyDeserializer = config.getConfiguredInstance(

                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.keyDeserializer.configure(config.originals()true);else {            config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);this.keyDeserializer = keyDeserializer;}

if (valueDeserializer == null) {this.valueDeserializer = config.getConfiguredInstance(

                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.valueDeserializer.configure(config.originals()false);else {            config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);this.valueDeserializer = valueDeserializer;}

生成用於具體讀取訊息的讀取例項,這個例項依賴如下幾個配置資訊:

1,配置項fetch.min.bytes,預設值1,用於設定每次讀取的最小byte數。

2,配置項fetch.max.wait.ms,預設值500ms,用於設定每次讀取的最大等待時間。

3,配置項max.partition.fetch.bytes,預設值1MB,用於設定每個partition每次讀取的最大的資料量。

4,配置項check.crcs,預設值true,用於設定是否校驗資料的完整性。

根據retry.backoff.ms配置的值來設定讀取資訊失敗的重試間隔。this.fetcher new Fetcher<>(this.client,config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),this.keyDeserializer,this.valueDeserializer,this.metadata,this.subscriptions,metrics,metricGrpPrefix,metricsTags,this.time,this.retryBackoffMs);config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIXclientId);log.debug("Kafka consumer created");catch (Throwable t) {// call close methods if internal objects are already constructed        // this is to prevent resource leak. see KAFKA-2121close(true);// now propagate the exceptionthrow new KafkaException("Failed to construct kafka consumer"t);}}

訂閱topic

要對一個topic進行訂閱時,在consumer生成後,通過呼叫如下的函式來進行訂閱,

第二個引數是對一個partition的分配與取消分析時的監聽操作,可用於監聽對分配到當前的consumer或者取消時的其它操作。

public void subscribe(List<String> topicsConsumerRebalanceListener listener) {    acquire();try {if (topics.isEmpty()) {// treat subscribing to empty topic list as the same as unsubscribingthis.unsubscribe();else {log.debug(

相關推薦

kafka原始碼分析consumer原始碼

Consumer的client端 示例程式碼 Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SER

【kubernetes/k8s原始碼分析】kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

spark原始碼分析Master原始碼主備切換機制分析

Master原始碼分析之主備切換機制 1.當選為leader之後的操作 //ElectedLeader 當選leader case ElectedLeader => {

Android原始碼分析Glide原始碼分析&基礎版ImageLoader框架

1 Glide原始碼分析   Glide是一款由Bump Technologies開發的圖片載入框架,使得我們可以在Android平臺上以極度簡單的方式載入和展示圖片。本部落格基於Glide 3.7.0版本來進行講解,這個版本的Glide相當成熟和穩定。

Mybatis深入原始碼分析SqlSessionFactoryBuilder原始碼分析

一:原始碼分析程式碼片段 public static void main(String[] args) {

Kafka 原始碼分析LogSegment

這裡分析kafka LogSegment原始碼 通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下. LogSe

kafka原始碼分析kafkaApis

KafkaApis 說明:用於處理對kafka的訊息請求的中心轉發元件,kafkaapis需要依賴於如下幾個元件: apis = new KafkaApis(socketServer.requestChannel, replicaManager,  consumer

kafka原始碼分析producer

Producer的client端 示例程式碼 Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("client.id", "De

kafka原始碼分析kafkacluster的管理-KafkaController

KafkaController 說明,這個例項主要用於對kafka cluster進行管理,一個kafka的cluster表示同一個zk環境下所有的broker的集合,在這個cluster中需要有一個broker被選舉成為leader,用於管理其它的broker的上線與

分散式訊息佇列RocketMQ原始碼分析3 -- Consumer負載均衡機制 -- Rebalance

同Kafka一樣,RocketMQ也需要探討一個問題:如何把一個topic的多個queue分攤給不同的consumer,也就是負載均衡問題。 有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。 或掃描如下二維碼: 在討論這個問題之前,我們先看一

Spark原始碼分析Spark Shell(上)

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

Netty 原始碼分析拆包器的奧祕

為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資

Android原始碼分析為什麼在onCreate() 和 onResume() 獲取不到 View 的寬高

轉載自:https://www.jianshu.com/p/d7ab114ac1f7 先來看一段很熟悉的程式碼,可能在最開始接觸安卓的時候,大部分人都寫過的一段程式碼;即嘗試在 onCreate() 和 onResume() 方法中去獲取某個 View 的寬高資訊: 但是列印輸出後,我們會發

netty原始碼分析服務端啟動

ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB

SNMP原始碼分析(一)配置檔案部分

snmpd.conf想必不陌生。在程序啟動過程中會去讀取配置檔案中各個配置。其中幾個引數需要先知道是幹什麼的:   token:配置檔案的每行的開頭,例如 group MyROGroup v1 readSec 這行token的引數是group。  

【kubernetes/k8s原始碼分析】kubelet原始碼分析容器網路初始化原始碼分析

一. 網路基礎   1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash   1.2 bridge-nf-c

【kubernetes/k8s原始碼分析】kubelet原始碼分析資源上報

0. 資料流   路徑: pkg/kubelet/kubelet.go   Run函式() ->   syncNodeStatus ()  ->   registerWithAPIServer() ->

【kubernetes/k8s原始碼分析】kubelet原始碼分析啟動容器

主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime

Android系統原始碼分析-ContentProvider

距離上一次寫部落格已經半年多了,這半年發生了很多事情,也有了很多感觸,最主要是改變了忙碌了工作,更加重視身體的健康,為此也把工作地點從深圳這個一線城市換到了珠海,工作相對沒有那麼累,身體感覺也好了很多。所以在工作完成之餘,也有了更多的時間來自我學習和提高,後續會用更多時間來寫更多實用的東西,幫助我們理解

Vue 原始碼分析proxy代理

Vue 原始碼分析之proxy代理 當我們在使用Vue進行資料設定時,通常初始化格式為: let data = { age: 12, name: 'yang' } // 例項化Vue物件 let vm = new Vue({ data })