1. 程式人生 > 程式設計 >聊聊rocketmq的TransientStorePool

聊聊rocketmq的TransientStorePool

本文主要研究一下rocketmq的TransientStorePool

TransientStorePool

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

public class TransientStorePool {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It's a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer,new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer,new NativeLong(fileSize));
        }
    }

    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }

    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.",availableBuffers.size());
        }
        return buffer;
    }

    public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}
複製程式碼
  • TransientStorePool的構造器會根據MessageStoreConfig設定poolSize、fileSize屬性;其init方法會建立poolSize個byteBuffer放入到availableBuffers中;其destroy方法會遍歷availableBuffers,然後取出其address進行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法會執行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然後offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()為true的情況下會返回availableBuffers.size(),否則返回Integer.MAX_VALUE

isTransientStorePoolEnable

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

public class MessageStoreConfig {
    //The root directory in
which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; //The directory in which the commitlog is kept @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; //...... @ImportantField private boolean transientStorePoolEnable = false; //...... /** * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is * ASYNC_FLUSH * * @return <tt>true</tt> or <tt>false</tt> */ public boolean isTransientStorePoolEnable() { return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole(); } public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) { this.transientStorePoolEnable = transientStorePoolEnable; } //...... } 複製程式碼
  • MessageStoreConfig定義了transientStorePoolEnable屬性,預設為false;其isTransientStorePoolEnable方法在transientStorePoolEnable為true且flushDiskType為FlushDiskType.ASYNC_FLUSH且brokerRole不為BrokerRole.SLAVE的時候返回true

小結

  • TransientStorePool的構造器會根據MessageStoreConfig設定poolSize、fileSize屬性;其init方法會建立poolSize個byteBuffer放入到availableBuffers中;其destroy方法會遍歷availableBuffers,然後取出其address進行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法會執行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然後offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()為true的情況下會返回availableBuffers.size(),否則返回Integer.MAX_VALUE

doc