1. 程式人生 > >kafka producer batch 發送消息

kafka producer batch 發送消息

接收 ons clas read poll 封裝 ransac version fig

1. 使用 KafkaProducer 發送消息,是按 batch 發送的,producer 首先把消息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch

2. KafkaProduer 類中有一個 Thread 屬性,負責 IO,發送和接收數據:
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    
this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM,
this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this
.sender, true); this.ioThread.start();

Sender 類實現了 Runnable 接口,封裝了具體的邏輯,發送消息和接收響應都在這個類中。

// 發送消息
long pollTimeout = sendProducerData(now);
// 接收響應
client.poll(pollTimeout, now);

3. 執行回調

org.apache.kafka.clients.producer.internals.Sender#completeBatch()

kafka producer batch 發送消息