聊聊rocketmq的TransientStorePool

编程

本文主要研究一下rocketmq的TransientStorePool

TransientStorePool

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java

public class TransientStorePool {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

private final int poolSize;

private final int fileSize;

private final Deque<ByteBuffer> availableBuffers;

private final MessageStoreConfig storeConfig;

public TransientStorePool(final MessageStoreConfig storeConfig) {

this.storeConfig = storeConfig;

this.poolSize = storeConfig.getTransientStorePoolSize();

this.fileSize = storeConfig.getMappedFileSizeCommitLog();

this.availableBuffers = new ConcurrentLinkedDeque<>();

}

/**

* It"s a heavy init method.

*/

public void init() {

for (int i = 0; i < poolSize; i++) {

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

final long address = ((DirectBuffer) byteBuffer).address();

Pointer pointer = new Pointer(address);

LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

availableBuffers.offer(byteBuffer);

}

}

public void destroy() {

for (ByteBuffer byteBuffer : availableBuffers) {

final long address = ((DirectBuffer) byteBuffer).address();

Pointer pointer = new Pointer(address);

LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));

}

}

public void returnBuffer(ByteBuffer byteBuffer) {

byteBuffer.position(0);

byteBuffer.limit(fileSize);

this.availableBuffers.offerFirst(byteBuffer);

}

public ByteBuffer borrowBuffer() {

ByteBuffer buffer = availableBuffers.pollFirst();

if (availableBuffers.size() < poolSize * 0.4) {

log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());

}

return buffer;

}

public int availableBufferNums() {

if (storeConfig.isTransientStorePoolEnable()) {

return availableBuffers.size();

}

return Integer.MAX_VALUE;

}

}

  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

isTransientStorePoolEnable

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

public class MessageStoreConfig {

//The root directory in which the log data is kept

@ImportantField

private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

//The directory in which the commitlog is kept

@ImportantField

private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"

+ File.separator + "commitlog";

//......

@ImportantField

private boolean transientStorePoolEnable = false;

//......

/**

* Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is

* ASYNC_FLUSH

*

* @return <tt>true</tt> or <tt>false</tt>

*/

public boolean isTransientStorePoolEnable() {

return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()

&& BrokerRole.SLAVE != getBrokerRole();

}

public void setTransientStorePoolEnable(final boolean transientStorePoolEnable) {

this.transientStorePoolEnable = transientStorePoolEnable;

}

//......

}

  • MessageStoreConfig定义了transientStorePoolEnable属性,默认为false;其isTransientStorePoolEnable方法在transientStorePoolEnable为true且flushDiskType为FlushDiskType.ASYNC_FLUSH且brokerRole不为BrokerRole.SLAVE的时候返回true

小结

  • TransientStorePool的构造器会根据MessageStoreConfig设置poolSize、fileSize属性;其init方法会创建poolSize个byteBuffer放入到availableBuffers中;其destroy方法会遍历availableBuffers,然后取出其address进行LibC.INSTANCE.munlock
  • borrowBuffer返回availableBuffers.pollFirst(),returnBuffer方法会执行byteBuffer.position(0)以及byteBuffer.limit(fileSize),然后offerFirst方法放入availableBuffers
  • availableBufferNums方法在storeConfig.isTransientStorePoolEnable()为true的情况下会返回availableBuffers.size(),否则返回Integer.MAX_VALUE

doc

  • TransientStorePool

以上是 聊聊rocketmq的TransientStorePool 的全部内容, 来源链接: utcz.com/z/511579.html

回到顶部