kafka producer batch 傳送訊息
阿新 • • 發佈:2019-01-06
1. 使用 KafkaProducer 傳送訊息,是按 batch 傳送的,producer 首先把訊息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch
2. KafkaProduer 類中有一個 Thread 屬性,負責 IO,傳送和接收資料:
this.sender = new Sender(logContext, client, this.metadata,this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM,this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
Sender 類實現了 Runnable 介面,封裝了具體的邏輯,傳送訊息和接收響應都在這個類中。
// 傳送訊息 long pollTimeout = sendProducerData(now); // 接收響應 client.poll(pollTimeout, now);
3. 執行回撥
org.apache.kafka.clients.producer.internals.Sender#completeBatch()