聊聊rocketmq的FlushConsumeQueueService

编程

本文主要研究一下rocketmq的FlushConsumeQueueService

FlushConsumeQueueService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

    class FlushConsumeQueueService extends ServiceThread {

private static final int RETRY_TIMES_OVER = 3;

private long lastFlushTimestamp = 0;

private void doFlush(int retryTimes) {

int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();

if (retryTimes == RETRY_TIMES_OVER) {

flushConsumeQueueLeastPages = 0;

}

long logicsMsgTimestamp = 0;

int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();

long currentTimeMillis = System.currentTimeMillis();

if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {

this.lastFlushTimestamp = currentTimeMillis;

flushConsumeQueueLeastPages = 0;

logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();

}

ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {

for (ConsumeQueue cq : maps.values()) {

boolean result = false;

for (int i = 0; i < retryTimes && !result; i++) {

result = cq.flush(flushConsumeQueueLeastPages);

}

}

}

if (0 == flushConsumeQueueLeastPages) {

if (logicsMsgTimestamp > 0) {

DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);

}

DefaultMessageStore.this.getStoreCheckpoint().flush();

}

}

public void run() {

DefaultMessageStore.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

try {

int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();

this.waitForRunning(interval);

this.doFlush(1);

} catch (Exception e) {

DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);

}

}

this.doFlush(RETRY_TIMES_OVER);

DefaultMessageStore.log.info(this.getServiceName() + " service end");

}

@Override

public String getServiceName() {

return FlushConsumeQueueService.class.getSimpleName();

}

@Override

public long getJointime() {

return 1000 * 60;

}

}

  • FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

ConsumeQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

public class ConsumeQueue {

//......

public boolean flush(final int flushLeastPages) {

boolean result = this.mappedFileQueue.flush(flushLeastPages);

if (isExtReadEnable()) {

result = result & this.consumeQueueExt.flush(flushLeastPages);

}

return result;

}

//......

}

  • flush方法执行的是mappedFileQueue.flush(flushLeastPages),如果isExtReadEnable()为true则还会执行consumeQueueExt.flush(flushLeastPages)

MappedFileQueue

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java

public class MappedFileQueue {

//......

public boolean flush(final int flushLeastPages) {

boolean result = true;

MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);

if (mappedFile != null) {

long tmpTimeStamp = mappedFile.getStoreTimestamp();

int offset = mappedFile.flush(flushLeastPages);

long where = mappedFile.getFileFromOffset() + offset;

result = where == this.flushedWhere;

this.flushedWhere = where;

if (0 == flushLeastPages) {

this.storeTimestamp = tmpTimeStamp;

}

}

return result;

}

//......

}

  • MappedFileQueue的flush方法则主要执行mappedFile.flush(flushLeastPages)

MappedFile

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MappedFile.java

public class MappedFile extends ReferenceResource {

//......

public int flush(final int flushLeastPages) {

if (this.isAbleToFlush(flushLeastPages)) {

if (this.hold()) {

int value = getReadPosition();

try {

//We only append data to fileChannel or mappedByteBuffer, never both.

if (writeBuffer != null || this.fileChannel.position() != 0) {

this.fileChannel.force(false);

} else {

this.mappedByteBuffer.force();

}

} catch (Throwable e) {

log.error("Error occurred when force data to disk.", e);

}

this.flushedPosition.set(value);

this.release();

} else {

log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());

this.flushedPosition.set(getReadPosition());

}

}

return this.getFlushedPosition();

}

//......

}

  • MappedFile的flush方法主要是执行fileChannel.force(false)或者mappedByteBuffer.force()

小结

FlushConsumeQueueService继承了ServiceThread,其run方法,首先通过DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue()获取interval,然后执行waitForRunning(interval),最后执行doFlush(1);doFlush方法,首先通过getMessageStoreConfig().getFlushConsumeQueueLeastPages()获取flushConsumeQueueLeastPages,之后会遍历consumeQueueTable,执行ConsumeQueue的flush(flushConsumeQueueLeastPages);最后执行getStoreCheckpoint().flush()

doc

  • DefaultMessageStore

以上是 聊聊rocketmq的FlushConsumeQueueService 的全部内容, 来源链接: utcz.com/z/511867.html

回到顶部