1. 程式人生 > 實用技巧 >kafka生產者之記憶體池設計

kafka生產者之記憶體池設計

一個訊息被分割槽以後,訊息就會被放到一個快取裡面,我們看一下里面具體的細節。預設快取塊的大小是 32M,這個快取塊裡面有一個重要的資料結構:batches,這個資料結構是 key-value 的結果,key 就是訊息主題的分割槽,value 是一個佇列,裡面存的是傳送到對應分割槽的批次,Sender 執行緒就是把這些批次傳送到服務端

image.png

01 生產者高階設計之自定義資料結構

生產者把批次資訊用 batches 這個物件進行儲存。

Kafka 這兒採取的方式是自定義了一個數據結構:CopyOnWriteMap。

1、他們儲存的資訊的是 key-value 的結構,key 是分割槽,value 是要存到這個分割槽的對應批次(批次可能有多個,所以用的是佇列),故因為是 key-value 的資料結構,所以鎖定用 Map 資料結構。

2、這個 Kafka 生產者面臨的是一個高併發的場景,大量的訊息會湧入這個這個資料結構,所以這個資料結構需要保證執行緒安全,這樣我們就不能使用 HashMap 這樣的資料結構了。

3、這個資料結構需要支援的是讀多寫少的場景。讀多是因為每條訊息過來都會根據 key 讀取 value 的資訊,假如有 1000 萬條訊息,那麼就會讀取 batches 物件 1000 萬次。寫少是因為,比如我們生產者傳送資料需要往一個主題裡面去傳送資料,假設這個主題有 50 個分割槽,那麼這個 batches 裡面就需要寫 50 個 key-value 資料就可以了(大家要搞清楚我們雖然要寫 1000 萬條資料,但是這 1000 萬條是寫入 queue 佇列的 batch 裡的,並不是直接寫入 batches,所以就我們剛剛說的這個場景,batches 裡只需要最多寫 50 條資料就可以了)。

根據第二和第三個場景我們總結出來,Kafka 這兒需要一個能保證執行緒安全的,支援讀多寫少的 Map 資料結構。但是 Java 裡面並沒有提供出來的這樣的一個數據,唯一跟這個需求比較接近的是 CopyOnWriteArrayList,但是偏偏它又不是 Map 結構,所以 Kafka 這兒模仿 CopyOnWriteArrayList 設計了 CopyOnWriteMap。採用了讀寫分離的思想解決了執行緒安全且支援讀多寫少等問題。

高效的資料結構保證了生產者的效能。Kafka 生產者往 batches 裡插入資料的原始碼,生產者為了保證插入資料的高效能,採用了多執行緒,又為了執行緒安全,使用了分段加鎖等多種手段

02 生產者高階設計之記憶體池設計

剛剛我們看到 batches 裡面儲存的是批次,批次預設的大小是 16K,整個快取的大小是 32M,生產者每封裝一個批次都需要去申請記憶體,正常情況下如果一個批次傳送出去了以後,那麼這 16K 的記憶體就等著 GC 來回收了。但是如果是這樣的話,就可能會頻繁的引發 FullGC,故而影響生產者的效能,所以在快取裡面設計了一個記憶體池(類似於我們平時用的資料庫的連線池),一個 16K 的記憶體用完了以後,把資料清空,放入到記憶體池裡,下個批次用的時候直接從裡面獲取就可以。這樣大大的減少了 GC 的頻率,保證了生產者的穩定和高效

以下基於程式碼解釋:

Kafka為了提高吞吐量,Producer採用批量傳送資料的模式。Producer可以通過配置設定整個batch緩衝區的大小以及每一個batch的大小:

buffer.memory=         //預設32MB
batch.size=            //預設16KB
long nonPooledAvailableMemory
Deque<ByteBuffer> free           //可用的byteBuffer空間,每一個ByteBuffer大小就是上面配置的batch.size

整個緩衝區可用的空間 = nonPooledAvailableMemory + free * batch.size

在最開始整個緩衝區中free是空的,所有的記憶體空間都在nonPooledAvailableMemory中,每要建立一個batch(batch的大小正好是batch.size)就會從nonPooledAvailableMemory獲取空間,用完釋放空間時,空間不會回收到nonPooledAvailableMemory中,而是將ByteBuffer放到free中。那麼當下一次需要建立batch的時候,如果free中有沒有使用的ByteBuffer,就直接從free中獲取。

而對於需要建立size大於batch.size的batch時,永遠都是直接從nonPooledAvailableMemory獲取空間,並且釋放時放回nonPooledAvailableMemory中。如果nonPooledAvailableMemory不夠時,會從free中釋放一些ByteBuffer給nonPooledAvailableMemory。

為什麼有一些batch size不是配置的size大小呢?

因為有一些訊息體本身很大,大小超過batch.size,這些訊息每一條會建立一個ProducerBatch。

Kafka Producer單條訊息最大預設是1MB。

對於這些訊息,Kafka Producer其實相當於是一條一條傳送訊息,並且在BufferPool中並沒有很好的利用ByteBuffer,所以他們會影響Kafka Producer的吞吐量的。

所以在實際的生產環境中要根據訊息的大小調整batch.size的大小

如果nonPooledAvailableMemory + free * batch.size的大小也不夠建立batch時,程式就會等待別的正在使用的batch釋放空間,這個block時間預設是1min。

image.png

申請空間

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
... ...
try {
    //如何申請的資源正好是BATCH.SIZE的大小,並且free中有沒有使用的ByteBuffer,直接使用
    if (size == poolableSize && !this.free.isEmpty())
        return this.free.pollFirst();
    int freeListSize = freeSize() * this.poolableSize;
    //總的可以使用的記憶體大小 = this.nonPooledAvailableMemory + freeListSize
    if (this.nonPooledAvailableMemory + freeListSize >= size) {
        //nonPooledAvailableMemory記憶體不夠用時,釋放掉一些free中的byteBuffer,知道夠size用
        freeUp(size);
        this.nonPooledAvailableMemory -= size;
    } else {
        //資源不夠用的時候,等待資源
        int accumulated = 0;
        Condition moreMemory = this.lock.newCondition();
        try {
            long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
            this.waiters.addLast(moreMemory);
            //迴圈直到記憶體夠用
            while (accumulated < size) {
                long startWaitNs = time.nanoseconds();
                long timeNs;
                boolean waitingTimeElapsed;
                try {
                    //等待有記憶體資源釋放
                    waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                } finally {
                    long endWaitNs = time.nanoseconds();
                    timeNs = Math.max(0L, endWaitNs - startWaitNs);
                    recordWaitTime(timeNs);
                }
                if (this.closed)
                    throw new KafkaException("Producer closed while allocating memory");
                if (waitingTimeElapsed) {
                    //等待超時報錯
                    this.metrics.sensor("buffer-exhausted-records").record();
                    throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                }
                remainingTimeToBlockNs -= timeNs;
                //因為上面其他batch釋放了資源,所以在此嘗試獲取資源
                if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                    //如果是batch.size,並且釋放的資源使得free部位空時,就從free中獲取byteBuffer直接使用
                    buffer = this.free.pollFirst();
                    accumulated = size;
                } else {
                    //nonPooledAvailableMemory記憶體不夠用時,釋放掉一些free中的byteBuffer給nonPooledAvailableMemory,知道夠size用
                    freeUp(size - accumulated);
                    int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                    this.nonPooledAvailableMemory -= got;
                    accumulated += got;
                }
            }
            accumulated = 0;
        } finally {
            //沒有成功獲取到資源,把獲取的一部分資源交還給nonPooledAvailableMemory
            this.nonPooledAvailableMemory += accumulated;
            this.waiters.remove(moreMemory);
        }
    }
} finally {
    try {
        //有記憶體資源剩餘的時候,通知的資源等候著
        if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
        this.waiters.peekFirst().signal();
    } finally {
        lock.unlock();
    }
}
if (buffer == null)
    return safeAllocateByteBuffer(size);
else
    return buffer;
}
private void freeUp(int size) {
    //nonPooledAvailableMemory記憶體不夠用時,釋放掉一些free中的byteBuffer,直到夠size用
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

釋放空間

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        if (size == this.poolableSize && size == buffer.capacity()) {
            //batch.size大小的資源直接放到free中
            buffer.clear();
        this.free.add(buffer);
    } else {
        //不是batch.size大小的資源放到nonPooledAvailableMemory中
        this.nonPooledAvailableMemory += size;
    }
    Condition moreMem = this.waiters.peekFirst();
    if (moreMem != null)
        moreMem.signal();
    } finally {
        lock.unlock();
    }
}