聊聊rocketmq的TransientStorePool
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的TransientStorePool
TransientStorePool
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
private final MessageStoreConfig storeConfig;
public TransientStorePool(final MessageStoreConfig storeConfig) {
this.storeConfig = storeConfig;
this.poolSize = storeConfig.getTransientStorePoolSize();
this.fileSize = storeConfig.getMappedFileSizeCommitLog();
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
/**
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer,new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
public void destroy() {
for (ByteBuffer byteBuffer : availableBuffers) {
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.munlock(pointer,new NativeLong(fileSize));
}
}
public void returnBuffer(ByteBuffer byteBuffer) {
byteBuffer.position(0);
byteBuffer.limit(fileSize);
this.availableBuffers.offerFirst(byteBuffer);
}
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
if (availableBuffers.size() < poolSize * 0.4) {
log.warn("TransientStorePool only remain {} sheets.",availableBuffers.size());
}
return buffer;
}
public int availableBufferNums() {
if (storeConfig.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
}
複製程式碼
- TransientStorePool的構造器會根據MessageStoreConfig設定poolSize、fileSize屬性;其init方法會建立poolSize個byteBuffer放入到availableBuffers中;其destroy方法會遍歷availableBuffers,然後取出其address進行LibC.INSTANCE.munlock
- borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法會執行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然後offerFirst方法放入availableBuffers
- availableBufferNums方法在storeConfig.isTransientStorePoolEnable()為true的情況下會返回availableBuffers.size(),否則返回Integer.MAX_VALUE
isTransientStorePoolEnable
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
public class MessageStoreConfig {
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
//......
@ImportantField
private boolean transientStorePoolEnable = false;
//......
/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
* ASYNC_FLUSH
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}
public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
this.transientStorePoolEnable = transientStorePoolEnable;
}
//......
}
複製程式碼
- MessageStoreConfig定義了transientStorePoolEnable屬性,預設為false;其isTransientStorePoolEnable方法在transientStorePoolEnable為true且flushDiskType為FlushDiskType.ASYNC_FLUSH且brokerRole不為BrokerRole.SLAVE的時候返回true
小結
- TransientStorePool的構造器會根據MessageStoreConfig設定poolSize、fileSize屬性;其init方法會建立poolSize個byteBuffer放入到availableBuffers中;其destroy方法會遍歷availableBuffers,然後取出其address進行LibC.INSTANCE.munlock
- borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法會執行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然後offerFirst方法放入availableBuffers
- availableBufferNums方法在storeConfig.isTransientStorePoolEnable()為true的情況下會返回availableBuffers.size(),否則返回Integer.MAX_VALUE