Kafka訊息的壓縮機制
最近在做 AWS cost saving 的事情,對於 Kafka 訊息叢集,計劃通過壓縮訊息來減少訊息儲存所佔空間,從而達到減少 cost 的目的。本文將結合原始碼從 Kafka 支援的訊息壓縮型別、何時需要壓縮、如何開啟壓縮、何處進行解壓縮以及壓縮原理來總結 Kafka 整個訊息壓縮機制。文中所涉及原始碼部分均來自於 Kafka 當前最新的 3.3.0-SNAPSHOT 版本。
Kafka支援的訊息壓縮型別
什麼是 Kafka 的訊息壓縮
在談訊息壓縮型別之前,我們先看下 Kafka 中關於訊息壓縮的定義是什麼。
Kafka 官網 有這樣一段解釋:
此為 Kafka 中端到端的塊壓縮功能。如果啟用,資料將由 producer 壓縮,以壓縮格式寫入伺服器,並由 consumer 解壓縮。壓縮將提高 consumer 的吞吐量,但需付出一定的解壓成本。這在跨資料中心映象資料時尤其有用。
也就是說,Kafka 的訊息壓縮是指將訊息本身採用特定的壓縮演算法進行壓縮並存儲,待消費時再解壓。
我們知道壓縮就是用時間換空間,其基本理念是基於重複,將重複的片段編碼為字典,字典的 key 為重複片段,value 為更短的程式碼,比如序列號,然後將原始內容中的片段用程式碼表示,達到縮短內容的效果,壓縮後的內容則由字典和程式碼序列兩部分組成。解壓時根據字典和程式碼序列可無損地還原為原始內容。注:有失真壓縮不在此次討論範圍。
通常來講,重複越多,壓縮效果越好。比如 JSON 是 Kafka 訊息中常用的序列化格式,單條訊息內可能並沒有多少重複片段,但如果是批量訊息,則會有大量重複的欄位名,批量中訊息越多,則重複越多,這也是為什麼 Kafka 更偏向塊壓縮,而不是單條訊息壓縮。
訊息壓縮型別
目前 Kafka 共支援四種主要的壓縮型別:Gzip、Snappy、Lz4 和 Zstd。關於這幾種壓縮的特性,
壓縮型別 | 壓縮比率 | CPU 使用率 | 壓縮速度 | 頻寬使用率 |
---|---|---|---|---|
Gzip | Highest | Highest | Slowest | Lowest |
Snappy | Medium | Moderate | Moderate | Medium |
Lz4 | Low | Lowest | Fastest | Highest |
Zstd | Medium | Moderate | Moderate | Medium |
從上表可知,Snappy 在 CPU 使用率、壓縮比、壓縮速度和網路頻寬使用率之間實現良好的平衡,我們最終也是採用的該型別進行壓縮試點。這裡值得一提的是,Zstd 是 Facebook 於 2016 年開源的新壓縮演算法,壓縮率和壓縮效能都不錯,具有與 Snappy(Google 傑作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支援。
針對這幾種壓縮本身的效能,Zstd GitHub 官方 公佈了壓測對比結果如下,
Compressor name | Ratio | Compression | Decompress. |
---|---|---|---|
zstd 1.5.1 -1 | 2.887 | 530 MB/s | 1700 MB/s |
zlib 1.2.11 -1 | 2.743 | 95 MB/s | 400 MB/s |
brotli 1.0.9 -0 | 2.702 | 395 MB/s | 450 MB/s |
zstd 1.5.1 --fast=1 | 2.437 | 600 MB/s | 2150 MB/s |
zstd 1.5.1 --fast=3 | 2.239 | 670 MB/s | 2250 MB/s |
quicklz 1.5.0 -1 | 2.238 | 540 MB/s | 760 MB/s |
zstd 1.5.1 --fast=4 | 2.148 | 710 MB/s | 2300 MB/s |
lzo1x 2.10 -1 | 2.106 | 660 MB/s | 845 MB/s |
lz4 1.9.3 | 2.101 | 740 MB/s | 4500 MB/s |
lzf 3.6 -1 | 2.077 | 410 MB/s | 830 MB/s |
snappy 1.1.9 | 2.073 | 550 MB/s | 1750 MB/s |
可以看到 Zstd 可以通過壓縮速度為代價獲得更高的壓縮比,二者之間的權衡可通過 --fast
引數靈活配置。
何時需要壓縮
壓縮是需要額外的 CPU 代價的,並且會帶來一定的訊息分發延遲,因而在壓縮前要慎重考慮是否有必要。筆者認為需考慮以下幾方面:
- 壓縮帶來的磁碟空間和頻寬節省遠大於額外的 CPU 代價,這樣的壓縮是值得的。
- 資料量足夠大且具重複性。訊息壓縮是批量的,低頻的資料流可能都無法填滿一個批量,會影響壓縮比。資料重複性越高,往往壓縮效果越好,例如 JSON、XML 等結構化資料;但若資料不具重複性,例如文字都是唯一的 md5 或 UUID 之類,違背了壓縮的重複性前提,壓縮效果可能不會理想。
- 系統對訊息分發的延遲沒有嚴苛要求,可容忍輕微的延遲增長。
如何開啟壓縮
Kafka 通過配置屬性 compression.type
控制是否壓縮。該屬性在 producer 端和 broker 端各自都有一份,也就是說,我們可以選擇在 producer 或 broker 端開啟壓縮,對應的應用場景各有不同。
在 Broker 端開啟壓縮
compression.type 屬性
Broker 端的 compression.type
屬性預設值為 producer
,即直接繼承 producer 端所發來訊息的壓縮方式,無論訊息採用何種壓縮或者不壓縮,broker 都原樣儲存,這一點可以從如下程式碼片段看出:
class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
...
private def analyzeAndValidateRecords(records: MemoryRecords,
origin: AppendOrigin,
ignoreRecordSize: Boolean,
leaderEpoch: Int): LogAppendInfo = {
records.batches.forEach { batch =>
...
val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
}
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec);
}
}
object BrokerCompressionCodec {
val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name)
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))
def getCompressionCodec(compressionType: String): CompressionCodec = {
compressionType.toLowerCase(Locale.ROOT) match {
case UncompressedCodec.name => NoCompressionCodec
case _ => CompressionCodec.getCompressionCodec(compressionType)
}
}
def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
if (ProducerCompressionCodec.name.equals(compressionType))
producerCompression
else
getCompressionCodec(compressionType)
}
}
sourceCodec
為 recordBatch
上的編碼,即表示從 producer 端發來的這批訊息的編碼。 targetCodec
為 broker 端配置的壓縮編碼,從函式 getTargetCompressionCodec
可以看出最終儲存訊息的目標編碼是結合 broker 端的 compressionType
和 producer 端的 producerCompression
綜合判斷的:當 compressionType
為 producer
時直接採用 producer 端的 producerCompression
,否則就採用 broker 端自身的編碼設定 compressionType
。從 brokerCompressionCodecs
的取值可看出,compression.type
的可選值為 [uncompressed, zstd, lz4, snappy, gzip, producer]
。其中 uncompressed
與 none
是等價的,producer
不用多說,其餘四個則是標準的壓縮型別。
broker 和 topic 兩個級別
在 broker 端的壓縮配置分為兩個級別:全域性的 broker 級別 和 區域性的 topic 級別。顧名思義,如果配置的是 broker 級別,則對於該 Kafka 叢集中所有的 topic 都是生效的。但如果 topic 級別配置了自己的壓縮型別,則會覆蓋 broker 全域性的配置,以 topic 自己配置的為準。
broker 級別
要配置 broker 級別的壓縮型別,可通過 configs
命令修改 compression.type
配置項取值。此處要使修改生效,是否需要重啟 broker 取決於 Kafak 的版本,在 1.1.0 之前,任何配置項的改動都需要重啟 broker 才生效,而從 1.1.0 版本開始,Kafka 引入了動態 broker 引數,將配置項分為三類:read-only
、per-broker
和 cluster-wide
,第一類跟原來一樣需重啟才生效,而後面兩類都是動態生效的,只是影響範圍不同,關於 Kafka 動態引數,以後單開博文介紹。從 官網 可以看到,compression.type
是屬於 cluster-wide
的,如果是 1.1.0 及之後的版本,則無需重啟 broker。
topic 級別
topic 的配置分為兩部分,一部分是 topic 特有的,如 partitions 等,另一部分則是預設採用 broker 配置,但也可以覆蓋。如果要定義 topic 級別的壓縮,可以在 topic 建立時通過 --config 選項覆蓋配置項 compression.type
的取值,命令如下:
sh bin/kafka-topics.sh --create --topic my-topic --replication-factor 1 --partitions 1 --config compression.type=snappy
當然也可以通過 configs
命令修改 topic 的 compression.type
取值,命令如下:
bin/kafka-configs.sh --entity-type topics --entity-name my-topic --alter --add-config compression.type=snappy
在 Producer 端壓縮
compression.type 屬性
跟 broker 端一樣,producer 端的壓縮配置屬性依然是 compression.type
,只不過預設值和可選值有所不同。預設值為 none
,表示不壓縮,可選值為列舉類 CompressionType
中所有例項對應 name
的列表。
開啟壓縮的方式
直接在程式碼層面更改 producer 的 config,示例如下。但需要注意的是,改完 config 之後,需要重啟 producer 端的應用程式,壓縮才會生效。
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "3000");
config.put(ProducerConfig.LINGER_MS_CONFIG, "1");
...
// 開啟 Snappy 壓縮
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(config));
}
}
壓縮和解壓的位置
何處會壓縮
可能產生壓縮的地方有兩處:producer 端和 broker 端。
producer 端
producer 端發生壓縮的唯一條件就是在 producer 端為屬性 compression.type
配置了除 none
之外有效的壓縮型別。此時,producer 在向所負責的所有 topics 發訊息之前,都會將訊息壓縮處理。
broker 端
對於 broker 端,產生壓縮的情況就複雜得多,這不僅取決於 broker 端自身的壓縮編碼 targetCodec
是否是需要壓縮的型別,還取決於 targetCodec
跟 producer 端的 sourceCodec
是否相同,除此之外,還跟訊息格式的 magic
版本有關。直接看程式碼,broker 端的訊息讀寫是由 UnifiedLog
負責的,訊息持久化的核心入口是 append
方法,程式碼如下:
class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
...
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
...
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = new LongRef(localLog.logEndOffset)
appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
topicPartition,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion,
brokerTopicStats,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")))
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
...
} else {
// we are taking the offsets we are given
...
}
...
maybeDuplicate match {
case Some(duplicate) =>
...
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
updateHighWatermarkWithLogEndOffset()
...
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${localLog.logEndOffset}, " +
s"and messages: $validRecords")
if (localLog.unflushedMessages >= config.flushInterval) flush(false)
}
appendInfo
}
}
}
}
}
可以看到,先是採用 analyzeAndValidateRecords
在 recordBatch
的維度對批量訊息整體做校驗,比如 CRC、size 等,不會細化到單條訊息,所以這裡不會涉及解壓。這一步通過之後,會採用 LogValidator.validateMessagesAndAssignOffsets
對 recordBatch
以及單條訊息做進一步驗證併為訊息分配 offset
,該過程可能涉及解壓。完成這一步之後,呼叫 localLog.append
方法將訊息追加到本地日誌,這一步才是真正的落盤。我們繼續關注可能發生解壓的 LogValidator
部分,程式碼如下:
private[log] object LogValidator extends Logging {
private[log] def validateMessagesAndAssignOffsets(records: MemoryRecords,
topicPartition: TopicPartition,
offsetCounter: LongRef,
time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean,
magic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
brokerTopicStats: BrokerTopicStats,
requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
convertAndAssignOffsetsNonCompressed(records, topicPartition, offsetCounter, compactedTopic, time, now, timestampType,
timestampDiffMaxMs, magic, partitionLeaderEpoch, origin, brokerTopicStats)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, origin, magic, brokerTopicStats)
} else {
validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec,
targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin,
interBrokerProtocolVersion, brokerTopicStats, requestLocal)
}
}
...
}
從上可知,當 broker 端配置的壓縮編碼 targetCodec
與所收到的批量訊息的壓縮編碼 sourceCodec
都為 none
即不壓縮時,會再檢查訊息格式的版本,如果與 broker 端配置的版本不同,則需要先將原批量訊息轉換為目標版本 magic
對應格式的新批量訊息,然後再在新批量訊息中分配 offset
;否則直接在原批量訊息中就地分配 offset
,此過程均不涉及解壓縮。這裡稍微解釋下分配 offset
的邏輯,我們知道在 Kafka 中 offset
是 partition
下每條訊息的唯一標識,consumer 端也是根據 offset
來追蹤消費進度,而 offset
的生成和寫入則是在 broker 端,就是此處提到的 offset
分配。理論上說,broker 需要為每條訊息都分配一個 offset
的,但在實踐中,因為用的是 recordBatch
,內部訊息是順序排列的且總記錄數是知道的,而 recordBatch
本身會記錄 baseOffset
,故通常只需設定 lastOffset
即可。唯一的例外是,當因訊息格式轉換或解壓縮而需要建立新的 recordBatch
時,會呼叫 memoryRecordsBuilder
的 appendWithOffset
方法為每一條訊息記錄分配 offset
。
當 targetCodec
與 sourceCodec
至少有一個不為 none
即需要壓縮時,情況就複雜一些,具體邏輯都在 validateMessagesAndAssignOffsetsCompressed
方法中,
private[log] object LogValidator extends Logging {
...
def validateMessagesAndAssignOffsetsCompressed(...): ValidationAndOffsetAssignResult = {
...
// No in place assignment situation 1
var inPlaceAssignment = sourceCodec == targetCodec
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
var uncompressedSizeInBytes = 0
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
// One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
// a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)
// No in place assignment situation 2 and 3: we only need to check for the first batch because:
// 1. For most cases (compressed records, v2, for example), there's only one batch anyways.
// 2. For cases that there may be multiple batches, all batches' magic should be the same.
if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
inPlaceAssignment = false
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
inPlaceAssignment = true
records.batches.forEach { batch =>
validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// if we are on version 2 and beyond, and we know we are going for in place assignment,
// then we can optimize the iterator to skip key / value / headers since they would not be used at all
val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(requestLocal.bufferSupplier)
else
batch.streamingIterator(requestLocal.bufferSupplier)
try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
// this is a hot path and we want to avoid any unnecessary allocations.
var batchIndex = 0
recordsIterator.forEachRemaining { record =>
val expectedOffset = expectedInnerOffset.getAndIncrement()
val recordError = validateRecordCompression(batchIndex, record).orElse {
validateRecord(batch, topicPartition, record, batchIndex, now,
timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse {
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
// Some older clients do not implement the V1 internal offsets correctly.
// Historically the broker handled this by rewriting the batches rather
// than rejecting the request. We must continue this handling here to avoid
// breaking these clients.
if (record.offset != expectedOffset)
inPlaceAssignment = false
}
None
}
}
recordError match {
case Some(e) => recordErrors += e
case None =>
uncompressedSizeInBytes += record.sizeInBytes()
validatedRecords += record
}
batchIndex += 1
}
processRecordErrors(recordErrors)
} finally {
recordsIterator.close()
}
}
if (!inPlaceAssignment) {
val (producerId, producerEpoch, sequence, isTransactional) = {
// note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
// with older magic versions, there will never be a producer id, etc.
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
uncompressedSizeInBytes)
} else {
// we can update the batch only and write the compressed payload as is;
// again we assume only one record batch within the compressed set
val batch = records.batches.iterator.next()
val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
batch.setLastOffset(lastOffset)
if (timestampType == TimestampType.LOG_APPEND_TIME)
maxTimestamp = now
if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
batch.setMaxTimestamp(timestampType, maxTimestamp)
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
messageSizeMaybeChanged = false,
recordConversionStats = recordConversionStats)
}
}
...
}
可以看到,inPlaceAssignment
是用於標識是否可以原地修改 recordBatch
來分配 offset
,有三種情況不能原地修改:
- sourceCodec 和 targetCodec 不同,這個比較好理解,編碼不同,構建目標 payload 時原
recordBatch
自然不能複用。 - 目標訊息格式版本
magic
與 broker 接收到的recordBatch
的magic
不同,此時需要訊息格式轉換,需要構建新的recordBatch
,這個跟第一種情況是一樣的,無法複用原recordBatch
。 - 目標訊息格式版本為
V0
,因為老版本V0
格式的訊息,需要為每條訊息重新分配絕對offset
,無法複用原recordBatch
。
此時,inPlaceAssignment
為 false,直接走 buildRecordsAndAssignOffsets
邏輯來構建新的 recordBatch
,此時是否壓縮取決於 targetCodec
,如果不為none
,則此處會按照 targetCodec
編碼進行壓縮。
除了上述三種情況之外,都是可以原地修改,此時可以直接複用原 recordBatch
來構建目標訊息的 payload,此時不存在壓縮處理。
何處會解壓
可能發生解壓的地方依然是兩處:consumer 端和 broker 端。
consumer 端
consumer 端發生解壓的唯一條件就是從 broker 端拉取到的訊息是帶壓縮的。此時,consumer 會根據 recordBatch
中 compressionType
來對訊息進行解壓,具體細節後面原始碼分析部分會講。
broker 端
broker 端是否發生解壓取決於 producer 發過來的批量訊息 recordBatch
是否是壓縮的:如果 producer 開啟了壓縮,則會發生解壓,否則不會。原因簡單說下,在 broker 端持久化訊息前,會對訊息做各種驗證,此時必然會迭代 recordBatch
,而在迭代的過程中,會直接採用 recordBatch
上的 compressionType
對訊息位元組流進行處理,是否解壓取決於 compressionType
是否是壓縮型別。關於這點,可以在 LogValidator
的 validateMessagesAndAssignOffsets
方法實現中可以看到,在 convertAndAssignOffsetsNonCompressed
、assignOffsetsNonCompressed
和 validateMessagesAndAssignOffsetsCompressed
三個不同的分支中,都會看到 records.batches.forEach {...}
的身影,而在後面的原始碼分析中會發現,在 recordBatch
的迭代器邏輯中,直接採用的 compressionType
的解壓邏輯對訊息位元組流讀取的。也就是說,如果 recordBatch
是壓縮的 ,只要對其進行了迭代訪問,則會自動觸發解壓邏輯。
壓縮和解壓原理
壓縮和解壓涉及到幾個關鍵的類:CompressionType
、MemoryRecordsBuilder
、DefaultRecordBatch
、AbstractLegacyRecordBatch
。其中 CompressionType
是壓縮相關的列舉,集壓縮定義和實現為一體;MemoryRecordsBuilder
是負責將新的訊息資料寫入記憶體 buffer,即呼叫 CompressionType
中的壓縮邏輯 wrapForOutput
來寫入訊息;而 DefaultRecordBatch
和 AbstractLegacyRecordBatch
則是負責讀取訊息資料,即呼叫 CompressionType
的解壓邏輯 wrapForInput
將訊息還原為無壓縮資料。只不過二者區別是,前者是用於處理新版本格式的訊息(即 magic >= 2
),而後者則是處理老版本格式的訊息(即 magic 為 0 或 1
)。
CompressionType
在說 CompressionType
之前,我們先看下 CompressionCodec
這個 Scala 指令碼。
CompressionCodec
部分原始碼如下,
...
case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 1
val name = "gzip"
}
case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 2
val name = "snappy"
}
case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 3
val name = "lz4"
}
case object ZStdCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 4
val name = "zstd"
}
case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 0
val name = "none"
}
case object UncompressedCodec extends BrokerCompressionCodec {
val name = "uncompressed"
}
case object ProducerCompressionCodec extends BrokerCompressionCodec {
val name = "producer"
}
該指令碼定義了 GZIPCompressionCodec
等共 7 個 case object,可類比於 Java 中列舉,這些 case object 中的 name
集合則剛好覆蓋了前文所提到的屬性 compression.type
的所有可選值,包括 producer 端和 broker 端的。而與 name
繫結在一起的 codec
則是最終真正寫入訊息體的壓縮編碼,name
只是為了可讀性友好。從上可知,壓縮編碼codec
的有效取值只有 0~4
,分別對應 none
、gzip
、snappy
、lz4
和zstd
,而這五種取值恰好是 CompressionType
中定義的五種列舉常量。
由此可知,CompressionCodec
是面向配置屬性 compression.type
的可選值的,並將數值化的壓縮編碼 codec
對映為可讀性強的 name
;而 CompressionType
則是定義了與壓縮編碼對應的列舉常量,二者通過 name
關聯。
CompressionType 原始碼
CompressionType
定義了與壓縮編碼對應的五種壓縮型別列舉,並且通過用於壓縮的 wrapForOutput
和用於解壓的 wrapForInput
這兩個抽象方法將每種壓縮型別與對應的壓縮實現繫結在一起,既避免了常規的 if-else
判斷,也將壓縮的定義與實現完全收斂到 CompressionType
,符合單一職責原則。其實類似這種優雅的設計在 JDK 中也能經常看到其身影,比如 TimeUnit
。直接看原始碼,
public enum CompressionType {
...
GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
},
...
ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
return ZstdFactory.wrapForOutput(buffer);
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
}
};
...
// Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion);
// Wrap buffer with an InputStream that will decompress data with this CompressionType.
public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
...
}
每種壓縮型別對於 wrapForOutput
和 wrapForInput
兩方法的具體實現已經很清楚地闡述了壓縮和解壓的方式,感興趣的朋友可以從該入口 step in
一探究竟。這裡就不細述。當然這只是處理壓縮最小的基本單元,為了搞清楚 Kafka 在何處使用它,還得繼續看其他幾個核心類。
在此之前,就上述原始碼,拋開本次主題,我還想談幾個值得學習借鑑的細節,
Snappy
和Zstd
都是用的XXXFactory
靜態方法來構建 Stream 物件,而其他的比如Lz4
則都是直接通過new
建立的物件。之所以這麼做,我們進一步step in
就會發現,對於Snappy
和Zstd
,Kafka 都是直接依賴的第三方庫,而其他的則是 JDK 或 Kafka 自己的實現。為了減少第三方庫的副作用,通過此方式將第三方庫的類的惰性載入做到極致,這也體現出作者對 Java 類載入時機的充分理解,很精緻的處理。Gzip
的wrapForInput
實現中,在 KAFKA-6430 這個 Improvement 提交中,input buffer 從 0.5 KB 調大到 8 KB,其目的就是能夠在一次 Gzip 壓縮中處理更多的位元組,以獲得更高的效能。至少,從 commit 的描述上看,throughput 能翻倍。- 抽象方法
wrapForInput
中暴露的最後一個 BufferSupplier型別的引數decompressionBufferSupplier
,正如方法的引數說明所言,對於比較小的批量訊息,如果在wrapForInput
內部新建 buffer,那麼每次方法呼叫都會新分配buffer,這可能比壓縮處理本身更耗時,所以該引數給了一個選擇的機會,在外面分配記憶體,然後方法內迴圈利用。在日常的編碼中,對於迴圈中所需的空間,我也經常會思考是每次新建好還是先在外面分配,然後內部迴圈利用更好,case by case.
MemoryRecordsBuilder
public class MemoryRecordsBuilder implements AutoCloseable {
...
// Used to append records, may compress data on the fly
private DataOutputStream appendStream;
...
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit,
long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic > 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
if (isTransactional)
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
}
...
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
...
}
public void close() {
...
if (numRecords == 0L) {
buffer().position(initialPosition);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
else if (compressionType != CompressionType.NONE)
this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
buffer.position(initialPosition);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
...
private int writeDefaultBatchHeader() {
...
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);
buffer.position(pos);
return writtenCompressed;
}
private int writeLegacyCompressedWrapperHeader() {
...
int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
buffer.position(pos);
return writtenCompressed;
}
}
可以看到,appendStream
是用於追加訊息到記憶體 buffer 的,直接採用的 compressionType
的壓縮邏輯來構建寫入流的,如果此處 compressionType
屬於非 none
的有效壓縮型別,則會產生壓縮。此外,從上面 magic
的判斷邏輯可知,訊息的時間戳型別是從大版本 V1
開始支援的;而事務訊息、控制訊息、Zstd 壓縮和 deleteHorizonMs
都是從 V2
才開始支援的。這裡的 V1
、V2
對應訊息格式的版本,其中 V1
是從 0.10.0 版本開始引入的,在此之前都是 V0
版本,而 V2
則是從 0.11.0 版本開始引入,直到現在的最新版依然是 V2
。
從 close()
方法可以看出,MemoryRecordsBuilder
在構建 memoryRecords
時,會根據訊息格式的版本高低,寫入不同的 Header。對於新版訊息,在 writeDefaultBatchHeader
方法中直接呼叫 DefaultRecordBatch.writeHeader(...)
寫入新版訊息特定的 Header;而對於老版訊息,則是在 writeLegacyCompressedWrapperHeader
方法中呼叫 AbstractLegacyRecordBatch.writeHeader
和 LegacyRecord.writeCompressedRecordHeader
寫入老版訊息的 Header。雖然 Header 的格式各不相同,但我們在兩種 Header 中都可以看到 compressionType
的身影,以此可見,Kafka 是允許多種版本的訊息共存的,以及壓縮與非壓縮訊息的共存,因為這些資訊是儲存在 recordBatch
上的,是批量訊息級別。
DefaultRecordBatch
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
...
@Override
public Iterator<Record> iterator() {
if (count() == 0)
return Collections.emptyIterator();
if (!isCompressed())
return uncompressedIterator();
// for a normal iterator, we cannot ensure that the underlying compression stream is closed,
// so we decompress the full record set here. Use cases which call for a lower memory footprint
// can use `streamingIterator` at the cost of additional complexity
try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {
List<Record> records = new ArrayList<>(count());
while (iterator.hasNext())
records.add(iterator.next());
return records.iterator();
}
}
...
}
RecordBatch
是表示批量訊息的介面,對於老版格式的訊息(版本 V0
和 V1
),如果沒有壓縮,只會包含單條訊息,否則可以包含多條;而新版格式訊息(版本 V2
及以上)無論是否壓縮,都是通常包含多條訊息。且該介面中有一個 compressionType()
方法來標識該 batch 的壓縮型別,它會作為讀訊息時解壓的判斷依據。而上面的 DefaultRecordBatch
則是該介面的針對新版本格式訊息的預設實現,它也實現了 Iterable<Record>
介面,因而 iterator()
是訪問批量訊息的核心邏輯,當 compressionType()
返回 none
時,表示不壓縮,直接返回非壓縮迭代器,此處跳過,當有壓縮時,走的是壓縮迭代器,具體實現如下,
public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
}
private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
final DataInputStream inputStream = recordInputStream(bufferSupplier);
if (skipKeyValue) {
// this buffer is used to skip length delimited fields like key, value, headers
byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
return new StreamRecordIterator(inputStream) {
...
}
} else {
...
}
}
我們可以看到,compressedIterator()
在構造 Stream 迭代器之前,呼叫了 recordInputStream(...)
,該方法中通過 compressionType
的解壓邏輯對原資料進行了解壓。
AbstractLegacyRecordBatch
public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {
...
CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
if (isCompressed())
return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);
return new CloseableIterator<Record>() {
private boolean hasNext = true;
@Override
public void close() {}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public Record next() {
if (!hasNext)
throw new NoSuchElementException();
hasNext = false;
return AbstractLegacyRecordBatch.this;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
...
private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
boolean ensureMatchingMagic,
int maxMessageSize,
BufferSupplier bufferSupplier) {
LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
this.wrapperMagic = wrapperRecord.magic();
if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
if (compressionType == CompressionType.ZSTD)
throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
wrapperMagic + ")");
InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
...
}
}
}
AbstractLegacyRecordBatch
跟前面的 DefaultRecordBatch
大同小異,同樣也是 iterator()
入口,當開啟了壓縮時,返回壓縮迭代器 DeepRecordsIterator
,只是名字不同而已,迭代器內部依然是直接通過 compressionType
的解壓邏輯對資料流進行解壓。
原文首發於:https://www.yangbing.club/2022/04/30/compression-mechanism-of-the-Kafka-message/