Kafka—Storm之KafkaSpout和KafkaBolt原始碼解釋
轉載來自:http://blog.csdn.net/ransom0512/article/details/50497261
另一個比較詳細的KafkaSpout詳解見:http://www.cnblogs.com/cruze/p/4241181.html
Storm-Kafka原始碼解析
說明:本文所有程式碼基於Storm 0.10版本,本文描述內容只涉及KafkaSpout和KafkaBolt相關,不包含trident特性。
Kafka Spout
KafkaSpout的建構函式如下:
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
- 1
- 2
- 3
- 1
- 2
- 3
其構造引數來自於SpoutConfig物件,Spout中用到的所有引數都來自於該物件。該物件引數說明如下:
SpoutConfig
SpoutConfig繼承自KafkaConfig。兩個類內部所有引數及說明如下:
/**
* Kafka地址和分割槽關係對應資訊
* 在kafka的分割槽資訊和地址資訊都很清楚的情況下,可以以直接使用StaticHosts
* 但是該物件引數很難構建,需要的資訊很多,所以我們一般情況下並不使用它。
* 我們主要用的是ZKHosts的例項。可以在其中設定Zookeeper地址等資訊,然後動態獲取kafka元資料
* ZKHost的引數資訊見下面一段。
* 必選引數
**/
public final BrokerHosts hosts;
/**
* 要從kafka中讀取的topic佇列名稱
* 必選引數
**/
public final String topic;
/**
* Kafka的客戶端id引數,該引數一般不需要設定
* 預設值為kafka.api.OffsetRequest.DefaultClientId()
* 空字串
**/
public final String clientId;
/**
* Kafka Consumer每次請求獲取的資料量大小
* 每次獲取的資料消費完畢之後,才會再獲取資料
* 預設1MB
**/
public int fetchSizeBytes = 1024 * 1024;
/**
* Kafka SimpleConsumer 客戶端和服務端連線的超時時間
* 單位:毫秒
**/
public int socketTimeoutMs = 10000;
/**
* Consumer每次獲取資料的超時時間
* 單位:毫秒
**/
public int fetchMaxWait = 10000;
/**
* Consumer通過網路IO獲取資料的socket buffet大小,
* 預設1MB
**/
public int bufferSizeBytes = 1024 * 1024;
/**
* 該引數有兩個作用:
* 1:申明輸出的資料欄位 declareoutputFileds
* 2:對從kafka中讀到的資料進行反序列化,即將byte位元組陣列轉為tuple物件。
* 對kafka存入資料的key和message都比較關心的,可以使用KeyValueSchemeAsMultiScheme,
* 如果不關心,可以使用SchemeAsMultiScheme
* 預設介面實現一般都只會輸出一個欄位或者兩個欄位,很多時候,我們需要直接從kafka中讀取到資料之後,就將每個欄位解析了,然後進行簡單處理再emit
* 這個時候,建議自己實現MultiScheme介面
* 必選引數
**/
public MultiScheme scheme = new RawMultiScheme();
/**
* 在拓撲提交之後,KafkaSpout會從zookeeper中讀取以前的offset值,以便沿著上次位置繼續讀取資料。
* KafkaSpout會檢查拓撲ID和zookeeper中儲存的拓撲id是否相同。
* 如果不同,並且ignoreZkOffsets=true,那麼就會從startOffsetTime引數位置讀取資料
* 否則,沿著zookeeper中儲存的offset位置繼續讀取資料。
* 也就是說,當ignoreZkOffsets=true的時候,kafkaspout只能保證在拓撲不殺掉的情況下,當worker程序異常退出的時候,會沿著上次讀取位置繼續讀取資料,當拓撲重新提交的時候,就會從佇列最早位置開始讀取資料。
* 這樣就會存在重複讀取資料的問題,所以正式場景,該引數還是應該設定為false。以保證任何場景資料的只被讀取一次。
**/
public boolean ignoreZkOffsets = false;
/**
* 拓撲第一次提交,zookeeper中沒有儲存對應offset的情況下,預設從kafka中讀取的offset位置。預設從佇列最早位置開始讀取資料,即從佇列最開始位置讀取資料。
**/
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
/**
*
* 如果當前的(offset值-failed offsets中最小值) < maxOffsetBehind
* 那麼就會清理failed列表中所有大於maxOffsetBehind的offset值。
* 這是為了防止failed過多,重發太多導致記憶體溢位
* 不過預設為了保證資料不丟失,所以maxOffsetBehind設定的最大
**/
public long maxOffsetBehind = Long.MAX_VALUE;
/**
* 當KafkaSpout初始化之後,使用從zookeeper中讀取的上次記錄的offset
* 從kafka中獲取資料失敗,返回offsetOutofRange錯誤之後,
* 是否使用startOffset從佇列最早位置重新獲取資料。
* offsetOutofrange一般發生在topic被重建,分片被刪除的場景。
**/
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
/**
* metric監控資訊採集間隔
**/
public int metricsTimeBucketSizeInSecs = 60;
/**
* KafkaSpout儲存offset的zookeeper所在地址
* 獨立出來這個屬性是為了防止offset儲存位置不在kafka叢集中
* 如果kafka和storm在一個叢集,該屬性可以忽略
**/
public List<String> zkServers = null;
/**
* KafkaSpout儲存offset的zookeeper埠
* 如果kafka和storm在一個叢集,該屬性可以忽略
**/
public Integer zkPort = null;
/**
* offset在zookeeper中儲存的路徑
* 路徑計算方式為:${zkRoot}/${id}/${partitionId}
* 必選引數
**/
public String zkRoot = null;
/**
* kafkaSpout儲存offset的不同客戶端區分標誌
* 建議每個拓撲使用固定的,不同的引數,以保證拓撲重新提交之後,可以從上次位置繼續讀取資料
* 如果兩個拓撲公用同一個id,那麼可能會被重複讀取
* 如果在拓撲中使用了動態生成的uuid來作為id,那麼每次提交的拓撲,都會從佇列最開始位置讀取資料
* 必選引數
**/
public String id = null;
/**
* offset重新整理到zookeeper中的時間間隔
* 單位:毫秒
**/
public long stateUpdateIntervalMs = 2000;
/**
* 資料傳送失敗之後重試策略相關引數
**/
public long retryInitialDelayMs = 0;
/**
* 資料傳送失敗之後重試策略相關引數
**/
public double retryDelayMultiplier = 1.0;
/**
* 資料傳送失敗之後重試策略相關引數
**/
public long retryDelayMaxMs = 60 * 1000;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
ZKHost中儲存了kafka叢集所在的zookeeper地址等資訊
ZKHost
/**
* kafka叢集zookeeper地址,允許包含chroot
* 比如:192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181/kafka
**/
public String brokerZkStr = null;
/**
* kafka叢集中broker元資料所在地址
* 預設為/brokers
* 如果配置了chroot,那麼就是/kafka/brokers
* 這個和kakfa服務端配置預設是一樣的,如果服務端採用預設配置,該屬性也可以使用預設值
**/
public String brokerZkPath = null; // e.g., /kafka/brokers
/**
* kafka broker分割槽資訊重新整理時間間隔,
* 單位:秒
* 當kafka有broker節點重啟或者分割槽資訊發生變化而導致資料讀取失敗的時候,
* 都會重新觸發一次分割槽資訊重新整理
**/
public int refreshFreqSecs = 60;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
KafkaSpout初始化
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
/*
* offset儲存位置的zookeeper地址
* 如果該地址為空,則預設使用Storm叢集的zookeeper
*/
List<String> zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
//儲存offset資訊到zookeeper
_state = new ZkState(stateConf);
//kafka叢集的聯結器
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
//從zookeeper中讀取kafka的broker資訊,只儲存自身例項需要用到的分割槽資訊
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
//兩個metrics監控資訊,忽略
context.registerMetric("kafkaOffset", new IMetric() { ...}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric("kafkaPartition", new IMetric() {...}, _spoutConfig.metricsTimeBucketSizeInSecs);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
以上是kafkaSpout的初始化方法,主要是完成對自身管理分割槽資訊的重新整理。
這裡有一個問題,就是會建立3個zookeeper客戶端連線,一個用來從kafka中讀取資料,一個儲存offset,一個是metrics監控資訊,每個zookeeper客戶端連線會建立3個執行緒,這樣,光一個kafkaSpout就會存在9個zookeeper執行緒!當worker程序中有多個spout例項的時候,就會產生更多的執行緒,這就會很消耗效能,這個還是建議對zookeeper連線進行合併處理。
系統通過KafkaUtils.calculatePartitionsForTask方法來獲取自己需要管理的分割槽列表:
for (int i = taskIndex; i < numPartitions; i += totalTasks) {
Partition taskPartition = partitions.get(i);
taskPartitions.add(taskPartition);
}
- 1
- 2
- 3
- 4
- 1
- 2
- 3
- 4
其中,taskIndex就對應自身spout例項的序號,比如該spout併發度為3,那麼這個spout例項就可能為0,1,2。當kafka的topic有5個分割槽的時候,第一個spout例項管理0,3的分割槽;第二個spout例項管理編號為1,4的分割槽,第三個spout例項管理編號為2的分割槽。
taskId儲存在Spout的Open方法的context引數中。context.getThisTaskIndex()
KafkaSpout從Kafka中如何讀取資料併發送
kafkaSpout主要在nextTuple方法中讀取資料並emit。
public void nextTuple() {
//獲取自身例項管理的分割槽列表
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
//_currPartitionIndex永遠小於manager的大小
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
//獲取資料並emit
EmitState state = managers.get(_currPartitionIndex).next(_collector);
/*
* 檢查此次資料傳送狀態
* 如果沒有取到資料或者取到的資料都已經emit完畢
* 那麼就增加_currPartitionIndex值,然後就可以從下個分割槽中讀取資料了。
*/
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
/*
* 如果還有資料沒有emit,就退出此次迴圈,等待下次nexttuple呼叫
* 然後仍然從當前分割槽中取獲取資料並emit
*/
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
//定期儲存offset資料到zookeeper
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
資料傳送狀態EmitState一共有三種狀態
- EMITTED_MORE_LEFT
上次取到的資料還沒有emit完畢- EMITTED_END,
上次取到的資料已經全部emit完畢- NO_EMITTED
本次沒有取到資料,沒有可供emit的資料
再來看下PartitionManager.next方法,裡面就包含如何獲取資料已經如何emit
public EmitState next(SpoutOutputCollector collector) {
//如果等待發送的佇列為空,那麼就從kafka中再取一次資料
if (_waitingToEmit.isEmpty()) {
fill();
}
while (true) {
//從等待發送的佇列中獲取第一個資料
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
//如果沒有可供傳送的資料,那麼返回emit狀態為沒有可以emit的資料
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
//根據KeyValueSchemeAsMultiScheme介面實現,將kafka中取到的資料轉為tuple
Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if (tups != null) {
//傳送所有的tuple,因為kafka一條資料可能對應storm的多條
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
break;
} else {
//如果tuple轉化失敗,返回null,直接告訴storm該條已經處理成功,即忽略資料錯誤
ack(toEmit.offset);
}
}
/*
* 每次從等待佇列中取一條資料反序列化並emit,
* 然後判斷等待佇列是否還有資料,
* 如果還有資料,就告訴spout,資料還沒有傳送完,不要切換分割槽
* 如果資料已經發送完畢,就告訴spout,資料已經發送完畢,可以切換到下個分割槽了。
*/
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
當有資料傳送失敗的時候,失敗的資料又會重新加入到_waitingToEmit佇列中,這樣就會產生一個問題,就是當資料傳送失敗的時候,kakfaSpout會永遠只讀一個分割槽,前天分割槽都不會讀取,從而產生資料消費不均勻的問題。
在0.9.6以前老版本的時候喲一個問題,就是當較多資料emit失敗的時候,會有很多的資料在不斷重試,然後重試不斷超時,又不斷重新加入重試列表,從而導致一個數據傳送的死迴圈。這個問題也就是offset超時的問題。見Storm-643, 這個問題目前在最新版本中已經解決。
KafkaBolt
KafkaBolt就比較簡單,0.10版本還是使用old Producer API。
Storm所有的配置屬性,都在kafka.broker.properties中儲存著,這就要求在submitTopology的時候,在topologyConf中再put一個kafka.broker.properties屬性,形成一個map中套map的結構。這樣有一點不好的就是一個拓撲中資料只能寫到一個kafka叢集中,不支援同事寫到多個kafka叢集中。不過這個在0.11新版本中已經解決了,kafka.broker.properties被作為了一個區域性變數,可以在不同的bolt例項中儲存不同的配置屬性。
資料寫入方法如下:
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
collector.ack(input);
return; // Do not try to send ticks to Kafka
}
K key = null;
V message = null;
String topic = null;
try {
//訊息的鍵值,不同的值在kafka中對應不同的分發方式,這個在KafkaBolt的FAQ中有介紹。
key = mapper.getKeyFromTuple(input);
//訊息體
message = mapper.getMessageFromTuple(input);
//topic名稱
topic = topicSelector.getTopic(input);
if(topic != null ) {
producer.send(new KeyedMessage<K, V>(topic, key, message));
} else {
LOG.warn("skipping key = " + key + ", topic selector returned null.");
}
collector.ack(input);
} catch (Exception ex) {
collector.reportError(ex);
collector.fail(input);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
Storm-Kafka FAQ
KafkaSpout
- KafkaSpout excutor數量和Kafka topic分割槽數量的關係
當executor併發度大於topic數量的時候,就會存在有的spout例項可以讀到資料, 有的spout例項讀不到資料。
當executor併發度小於topic數量的時候,就會存在一個spout例項對應多個分割槽的情況;kafka會先從一個分割槽中取一次資料,當這次獲取的資料emit完畢之後,就會再從下個分割槽中取資料。
當executor併發度等於topic數量的時候,一個spout例項對應一個分割槽。在實際應用中,我們也推薦這種配置方式。- 如何從kafka中讀取資料,每次讀取多少資料
根據fetchSizeBytes引數的配置,預設每次取1MB資料。- 資料讀取失敗如何處理
KafkaSpout每個PartitionManager內部儲存一個重試佇列,當資料傳送失敗的時候,加入重試佇列,然後重新發送,直到成功為止。
通過maxOffsetBehind引數來解決failed數量過多導致記憶體溢位問題。- Topic不存在如何處理
直接報錯。- 拓撲重新提交,會不會接著上次位置繼續讀取資料
重新提交的時候,只要id這個引數不變,那麼就會沿著上次位置繼續讀取資料。- zookeeper中儲存的kafka的offset位置有錯誤怎麼辦?
會丟擲offsetOutofRange異常,然後預設從kafka分割槽佇列最早位置開始讀取資料。- 能不能在一個spout中從多個topic讀取資料?
在0.10版本不行,在0.11版本中,支援按照正則方式匹配topic名稱,可以從所有滿足正則條件的topic中讀取資料。- topic分割槽主備資訊發生變化,如何處理
丟擲異常,然後馬上更新分割槽資訊,再次讀取資料。
KafkaBolt
- 寫入資料,kafka topic不存在怎麼辦?
如果kakfa服務端允許自動建立topic,那麼就會自動建立topic。
如果不允許自動建立,那麼就會丟擲異常- 如何寫資料到指定分割槽?
取決於tupleToKafkaMapper的介面實現。
kafka 0.10版本使用的是old producer的API,0.11版本使用的是new Producer的API
對於old Producer
如果key == null,那麼在kafka中,會隨機寸照一個分割槽去寫入資料,之後只要不重啟,就都會往這個分割槽寫入資料
如果key != null,那麼就會在寫入資料的時候,以utils.abs(key.hashCode)%numPartitions規則計算分割槽id
對於New Producer
如果key = null,那麼就會使用一個遞增的int值,每次傳送資料的時候遞增,然後執行utils.abs(nextValue)%availablePartitions.size(),資料寫入會比較均衡。
如果key != null,那麼就會按照Utils.abs(Utils.murmur2(record.key()))%numPartitions的規則計算分割槽。
當然,New Producer API也可以手工指定分割槽id。