kafka原始碼分析之consumer的原始碼
Consumer的client端
示例程式碼
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumer = new KafkaConsumer<>(props);this
consumer.subscribe(Collections.singletonList(this.topic));
//下面的傳入一個listener這個部分的註釋如果需要對partition在當前的consumer中分配或者取消分配時做一定的操作時(比如取消分配時提交offset),可以實現這個介面。
//subscribe(List<String> topics, ConsumerRebalanceListener listener)
while(true) {ConsumerRecords<Integer, String> records =
+ ") at offset " + record.offset());
}
consumer.commitSync()
}
生成KafkaConsumer例項
@SuppressWarnings("unchecked")private KafkaConsumer(ConsumerConfig config,Deserializer<K> keyDeserializer,Deserializer<V> valueDeserializer) {try {log.debug("Starting the Kafka consumer");
根據配置資訊,得到如下三個配置的配置值,並檢查配置的合法:
1,讀取request.timeout.ms配置項的值,預設值為40秒。用於配置請求的超時時間。
2,讀取session.timeout.ms配置項的值,預設值為30秒,用於配置當前的consumer的session的超時時間,也就是client端多長時間不給server傳送心跳就表示這個client端超時。
3,讀取fetch.max.wait.ms配置項的值,預設值為500ms。用於配置從server中讀取資料最長的等待時間。this.requestTimeoutMs = config.getInt(
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);int sessionTimeOutMs = config.getInt(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);int fetchMaxWaitMs = config.getInt(
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
如果請求超時時間不是一個大於session的超時時間的值或者請求超時時間不是一個大於fetch的最大等待時間的值時,表示requestTimeoutMs的配置不合法,直接throw exception.if (this.requestTimeoutMs <= sessionTimeOutMs ||
this.requestTimeoutMs <= fetchMaxWaitMs)throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG
+ " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG
+ " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = new SystemTime();MetricConfig metricConfig = new MetricConfig().samples(
config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(
ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS);
這裡得到對應的consumer的client端id的client.id配置,如果這個值沒有配置時,預設隨機生成一個。clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);if (clientId.length() <= 0)clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
List<MetricsReporter> reporters = config.getConfiguredInstances(
ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class);reporters.add(new JmxReporter(JMX_PREFIX));this.metrics = new Metrics(metricConfig, reporters, time);
讀取retry.backoff.ms配置的值,預設值為100ms,用於配置重試的間隔週期。
this.retryBackoffMs = config.getLong(
ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
根據重試的間隔週期加上metadata.max.age.ms配置項的值生成Metadata例項,
配置metadata.max.age.ms項預設值為5分鐘,用於設定metadata定期重新讀取的生命週期。this.metadata = new Metadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
讀取bootstrap.servers配置的要讀取的kafka brokers的配置列表,並根據broker的連線資訊,生成Cluster例項,並把Cluster例項更新到metadata的例項。List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";Map<String, String> metricsTags = new LinkedHashMap<String, String>();metricsTags.put("client-id", clientId);
生成NetworkClient的例項,生成例項需要如下幾個配置檔案:
1,配置項connections.max.idle.ms,預設值9分鐘,用於配置連線最大的空閒時間(每個連線的最大連線佇列為100)。
2,配置項reconnect.backoff.ms,預設值50ms,用於配置連線斷開後重新連線的間隔時間。
3,配置項send.buffer.bytes,預設值128kb,用於配置SOCKET的SO_SNDBUF傳送資料的緩衝區大小。
4,配置項receive.buffer.bytes,預設值32kb,用於配置SOCKET的SO_RCVBUF接收資料的緩衝區大小。
5,讀取request.timeout.ms配置項的值,預設值為40秒。用於配置請求的超時時間。ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(
config.values());NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(
ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, metricsTags, channelBuilder),this.metadata,clientId,100, // a fixed large enough value will sufficeconfig.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
根據retry.backoff.ms配置的值,生成一個ConsumerNetworkClient的例項。this.client = new ConsumerNetworkClient(netClient, metadata, time,
retryBackoffMs);
讀取auto.offset.reset配置項的值,預設值為latest。可配置("latest", "earliest", "none"),這個配置用於在讀取partition的offset超出範圍時,對offset進行重置的規則。OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(
config.getString(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
生成用於管理訂閱的topic的partition的狀態管理的元件,用於管理partition的狀態與當前的offset的資訊。this.subscriptions = new SubscriptionState(offsetResetStrategy);
生成用於管理相同的一個groupId下的多個client端的partition的分割槽控制,
通過partition.assignment.strategy配置,預設例項為RangeAssignor。List<PartitionAssignor> assignors = config.getConfiguredInstances( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,PartitionAssignor.class);
生成用於對consumer進行協調的例項,這個例項依賴如下配置:
1,配置項group.id,用於配置consumer對應的訂閱的組名稱,相同的組的多個client端進行協調消費處理。
2,讀取session.timeout.ms配置項的值,預設值為30秒,用於配置當前的consumer的session的超時時間,也就是client端多長時間不給server傳送心跳就表示這個client端超時。
3,配置項heartbeat.interval.ms,預設值3秒,用於定時向server傳送心跳的時間間隔。
4,根據retry.backoff.ms配置的值來設定讀取資訊失敗的重試間隔。
5,配置項enable.auto.commit,預設值true,設定是否自動提交消費過的offset的值的設定。
5,配置項auto.commit.interval.ms,預設值5秒,如果設定有自動提交offset時,自動提交的間隔時間。this.coordinator = new ConsumerCoordinator(this.client,config.getString(ConsumerConfig.GROUP_ID_CONFIG),config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),assignors,this.metadata,this.subscriptions,metrics,metricGrpPrefix,metricsTags,this.time,retryBackoffMs,new ConsumerCoordinator.DefaultOffsetCommitCallback(),config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
根據key.deserializer配置與value.deserializer配置的key,value的反序列化的配置,生成反序列化訊息的例項。這個類必須是實現Deserializer介面的類。if (keyDeserializer == null) {this.keyDeserializer = config.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.keyDeserializer.configure(config.originals(), true);} else { config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);this.keyDeserializer = keyDeserializer;}
if (valueDeserializer == null) {this.valueDeserializer = config.getConfiguredInstance(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,Deserializer.class);this.valueDeserializer.configure(config.originals(), false);} else { config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);this.valueDeserializer = valueDeserializer;}
生成用於具體讀取訊息的讀取例項,這個例項依賴如下幾個配置資訊:
1,配置項fetch.min.bytes,預設值1,用於設定每次讀取的最小byte數。
2,配置項fetch.max.wait.ms,預設值500ms,用於設定每次讀取的最大等待時間。
3,配置項max.partition.fetch.bytes,預設值1MB,用於設定每個partition每次讀取的最大的資料量。
4,配置項check.crcs,預設值true,用於設定是否校驗資料的完整性。
根據retry.backoff.ms配置的值來設定讀取資訊失敗的重試間隔。this.fetcher = new Fetcher<>(this.client,config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),this.keyDeserializer,this.valueDeserializer,this.metadata,this.subscriptions,metrics,metricGrpPrefix,metricsTags,this.time,this.retryBackoffMs);config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);log.debug("Kafka consumer created");} catch (Throwable t) {// call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121close(true);// now propagate the exceptionthrow new KafkaException("Failed to construct kafka consumer", t);}}
訂閱topic
要對一個topic進行訂閱時,在consumer生成後,通過呼叫如下的函式來進行訂閱,
第二個引數是對一個partition的分配與取消分析時的監聽操作,可用於監聽對分配到當前的consumer或者取消時的其它操作。
public void subscribe(List<String> topics, ConsumerRebalanceListener listener) { acquire();try {if (topics.isEmpty()) {// treat subscribing to empty topic list as the same as unsubscribingthis.unsubscribe();} else {log.debug(
Consumer的client端
示例程式碼
Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SER
資料流
UnsecuredDependencies -> run
1. cadvisor.New初始化
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadv
Master原始碼分析之主備切換機制
1.當選為leader之後的操作
//ElectedLeader 當選leader
case ElectedLeader => {
1 Glide原始碼分析
Glide是一款由Bump Technologies開發的圖片載入框架,使得我們可以在Android平臺上以極度簡單的方式載入和展示圖片。本部落格基於Glide 3.7.0版本來進行講解,這個版本的Glide相當成熟和穩定。
一:原始碼分析程式碼片段
public static void main(String[] args) {
這裡分析kafka LogSegment原始碼
通過一步步分析LogManager,Log原始碼之後就會發現,最終的log操作都在LogSegment上實現.LogSegment負責分片的讀寫恢復重新整理刪除等動作都在這裡實現.LogSegment程式碼同樣在原始碼目錄log下.
LogSe
KafkaApis
說明:用於處理對kafka的訊息請求的中心轉發元件,kafkaapis需要依賴於如下幾個元件:
apis = new KafkaApis(socketServer.requestChannel, replicaManager,
consumer
Producer的client端
示例程式碼
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("client.id", "De
KafkaController
說明,這個例項主要用於對kafka cluster進行管理,一個kafka的cluster表示同一個zk環境下所有的broker的集合,在這個cluster中需要有一個broker被選舉成為leader,用於管理其它的broker的上線與
同Kafka一樣,RocketMQ也需要探討一個問題:如何把一個topic的多個queue分攤給不同的consumer,也就是負載均衡問題。
有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。
或掃描如下二維碼:
在討論這個問題之前,我們先看一
https://www.cnblogs.com/xing901022/p/6412619.html
文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。
bin目錄結構:
-rwxr-xr-x. 1 bigdata bigdata 1089 Dec 為什麼要粘包拆包
為什麼要粘包
首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資
轉載自:https://www.jianshu.com/p/d7ab114ac1f7
先來看一段很熟悉的程式碼,可能在最開始接觸安卓的時候,大部分人都寫過的一段程式碼;即嘗試在 onCreate() 和 onResume() 方法中去獲取某個 View 的寬高資訊:
但是列印輸出後,我們會發 ServerBootstrap與Bootstrap分別是netty中服務端與客戶端的引導類,主要負責服務端與客戶端初始化、配置及啟動引導等工作,接下來我們就通過netty原始碼中的示例對ServerBootstrap與Bootstrap的原始碼進行一個簡單的分析。首先我們知道這兩個類都繼承自AbstractB snmpd.conf想必不陌生。在程序啟動過程中會去讀取配置檔案中各個配置。其中幾個引數需要先知道是幹什麼的:
token:配置檔案的每行的開頭,例如
group MyROGroup v1 readSec
這行token的引數是group。
一. 網路基礎
1.1 網路名稱空間的操作
建立網路名稱空間: ip netns add
名稱空間內執行命令: ip netns exec
進入名稱空間: ip netns exec bash
1.2 bridge-nf-c
0. 資料流
路徑: pkg/kubelet/kubelet.go
Run函式() ->
syncNodeStatus () ->
registerWithAPIServer() ->
主要是呼叫runtime,這裡預設為docker
0. 資料流
NewMainKubelet(cmd/kubelet/app/server.go) ->
NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime
距離上一次寫部落格已經半年多了,這半年發生了很多事情,也有了很多感觸,最主要是改變了忙碌了工作,更加重視身體的健康,為此也把工作地點從深圳這個一線城市換到了珠海,工作相對沒有那麼累,身體感覺也好了很多。所以在工作完成之餘,也有了更多的時間來自我學習和提高,後續會用更多時間來寫更多實用的東西,幫助我們理解
Vue 原始碼分析之proxy代理
當我們在使用Vue進行資料設定時,通常初始化格式為:
let data = {
age: 12,
name: 'yang'
}
// 例項化Vue物件
let vm = new Vue({
data
})
相關推薦
kafka原始碼分析之consumer的原始碼
【kubernetes/k8s原始碼分析】kubelet原始碼分析之cdvisor原始碼分析
spark原始碼分析之Master原始碼主備切換機制分析
Android原始碼分析之Glide原始碼分析&基礎版ImageLoader框架
Mybatis深入原始碼分析之SqlSessionFactoryBuilder原始碼分析
Kafka 原始碼分析之LogSegment
kafka原始碼分析之kafkaApis
kafka原始碼分析之producer
kafka原始碼分析之kafkacluster的管理-KafkaController
分散式訊息佇列RocketMQ原始碼分析之3 -- Consumer負載均衡機制 -- Rebalance
Spark原始碼分析之Spark Shell(上)
Netty 原始碼分析之拆包器的奧祕
Android原始碼分析之為什麼在onCreate() 和 onResume() 獲取不到 View 的寬高
netty原始碼分析之服務端啟動
SNMP原始碼分析之(一)配置檔案部分
【kubernetes/k8s原始碼分析】kubelet原始碼分析之容器網路初始化原始碼分析
【kubernetes/k8s原始碼分析】kubelet原始碼分析之資源上報
【kubernetes/k8s原始碼分析】kubelet原始碼分析之啟動容器
Android系統原始碼分析之-ContentProvider
Vue 原始碼分析之proxy代理