1. 程式人生 > >kafka producer的batch.size和linger.ms

kafka producer的batch.size和linger.ms

size while amp lose DC UC rain hset connect

1.問題

batch.size和linger.ms是對kafka producer性能影響比較大的兩個參數。batch.size是producer批量發送的基本單位,默認是16384Bytes,即16kB;lingger.ms是sender線程在檢查batch是否ready時候,判斷有沒有過期的參數,默認大小是0ms。

那麽producer是按照batch.size大小批量發送消息呢,還是按照linger.ms的時間間隔批量發送消息呢?這裏先說結論:其實滿足batch.size和ling.ms之一,producer便開始發送消息。

2.源碼分析

首先sender線程主要代碼如下,我們主要關心sender線程阻塞的情況:

void run(long now) {
        Cluster cluster = metadata.fetch();

        // result.nextReadyCheckDelayMs表示下次檢查是否ready的時間,也是//selecotr會阻塞的時間
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        if (result.unknownLeadersExist)
            this.metadata.requestUpdate
(); Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min
(notReadyTimeout, this.client.connectionDelay(node, now)); } } Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { for (List<RecordBatch> batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); // 暫且只關心result.nextReadyCheckDelayMs long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client.send(request, now); // poll最終會調用selector,pollTimeout也就是selector阻塞的時間 this.client.poll(pollTimeout, now); }

selector

private int select(long ms) throws IOException {
        if (ms < 0L)
            throw new IllegalArgumentException("timeout should be >= 0");

        if (ms == 0L)
            return this.nioSelector.selectNow();
        else
            return this.nioSelector.select(ms);
    }

我們可以從實例化一個新的KafkaProducer開始分析(還沒有調用send方法),在sender線程調用accumulator#ready(..)時候,會返回result,其中包含selector可能要阻塞的時間。由於還沒有調用send方法,所以Deque<RecordBatch>為空,所以result中包含的nextReadyCheckDelayMs也是最大值,這個時候selector會一直阻塞。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<Node>();
         // 初始化為最大值
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;

        boolean exhausted = this.free.queued() > 0;
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

                        // 和linger.ms有關
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

然後我們調用send方法往內存中放入了一條數據,由於是新建的一個RecordBatch,所以會喚醒sender線程
KafkaProducer#doSend(...)

if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

這個時候會喚醒阻塞在selector#select(..)的sender線程,sender線程又運行到accumulator#ready(..),由於Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最大值,而是和linger.ms有關的值。也就是時候selector會z最多阻塞lingger.ms後就返回,然後再次輪詢。

也就是說當Deque<RecordBatch>不為空的時候,sender線程會最多阻塞linger.ms時間;Deque<RecordBatch>為空的時候,sender線程會阻塞Long.MAX_VALUE時間;一旦調用了KafkaProduer#send(..)將消息放到內存中,新建了個RecordBatch,則會將sender線wakeup。

另外從上面的代碼,即KafkaProducer#doSend(...)中也可以看到,如果有一個RecordBatch滿了,也會調用Sender#wakeup(..),所以綜上所述:只要滿足linger.ms和batch.size滿了就會激活sender線程來發送消息。

kafka producer的batch.size和linger.ms