聊聊rocketmq的TransientStorePool

序
本文主要研究一下rocketmq的TransientStorePool
TransientStorePool
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
public class TransientStorePool {    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;
    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMappedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }
    /**
     * It"s a heavy init method.
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            availableBuffers.offer(byteBuffer);
        }
    }
    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }
    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }
    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }
    public int availableBufferNums() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}
- TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
 - borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
 - availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE
 
isTransientStorePoolEnable
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
public class MessageStoreConfig {    //The root directory in which the log data is kept
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
    //The directory in which the commitlog is kept
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";
    //......
    @ImportantField
    private boolean transientStorePoolEnable = false;
    //......
    /**
     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
     * ASYNC_FLUSH
     *
     * @return <tt>true</tt> or <tt>false</tt>
     */
    public boolean isTransientStorePoolEnable() {
        return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
            && BrokerRole.SLAVE != getBrokerRole();
    }
    public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {
        this.transientStorePoolEnable = transientStorePoolEnable;
    }
    //......
}
- MessageStoreConfig定义了transientStorePoolEnable属性,默认为false;其isTransientStorePoolEnable方法在transientStorePoolEnable为true且flushDiskType为FlushDiskType.ASYNC_FLUSH且brokerRole不为BrokerRole.SLAVE的时候返回true
 
小结
- TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
 - borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
 - availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE
 
doc
- TransientStorePool
 
以上是 聊聊rocketmq的TransientStorePool 的全部内容, 来源链接: utcz.com/z/511579.html


