聊聊rocketmq的CleanCommitLogService

编程

本文主要研究一下rocketmq的CleanCommitLogService

CleanCommitLogService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

    class CleanCommitLogService {

private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;

private final double diskSpaceWarningLevelRatio =

Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));

private final double diskSpaceCleanForciblyRatio =

Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));

private long lastRedeleteTimestamp = 0;

private volatile int manualDeleteFileSeveralTimes = 0;

private volatile boolean cleanImmediately = false;

public void excuteDeleteFilesManualy() {

this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;

DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");

}

public void run() {

try {

this.deleteExpiredFiles();

this.redeleteHangedFile();

} catch (Throwable e) {

DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);

}

}

private void deleteExpiredFiles() {

int deleteCount = 0;

long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();

int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();

int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

boolean timeup = this.isTimeToDelete();

boolean spacefull = this.isSpaceToDelete();

boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

if (timeup || spacefull || manualDelete) {

if (manualDelete)

this.manualDeleteFileSeveralTimes--;

boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",

fileReservedTime,

timeup,

spacefull,

manualDeleteFileSeveralTimes,

cleanAtOnce);

fileReservedTime *= 60 * 60 * 1000;

deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,

destroyMapedFileIntervalForcibly, cleanAtOnce);

if (deleteCount > 0) {

} else if (spacefull) {

log.warn("disk space will be full soon, but delete file failed.");

}

}

}

private void redeleteHangedFile() {

int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();

long currentTimestamp = System.currentTimeMillis();

if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {

this.lastRedeleteTimestamp = currentTimestamp;

int destroyMapedFileIntervalForcibly =

DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {

}

}

}

public String getServiceName() {

return CleanCommitLogService.class.getSimpleName();

}

private boolean isTimeToDelete() {

String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();

if (UtilAll.isItTimeToDo(when)) {

DefaultMessageStore.log.info("it"s time to reclaim disk space, " + when);

return true;

}

return false;

}

private boolean isSpaceToDelete() {

double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;

cleanImmediately = false;

{

String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();

double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

if (physicRatio > diskSpaceWarningLevelRatio) {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();

if (diskok) {

DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");

}

cleanImmediately = true;

} else if (physicRatio > diskSpaceCleanForciblyRatio) {

cleanImmediately = true;

} else {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();

if (!diskok) {

DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");

}

}

if (physicRatio < 0 || physicRatio > ratio) {

DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);

return true;

}

}

{

String storePathLogics = StorePathConfigHelper

.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());

double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);

if (logicsRatio > diskSpaceWarningLevelRatio) {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();

if (diskok) {

DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");

}

cleanImmediately = true;

} else if (logicsRatio > diskSpaceCleanForciblyRatio) {

cleanImmediately = true;

} else {

boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();

if (!diskok) {

DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");

}

}

if (logicsRatio < 0 || logicsRatio > ratio) {

DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);

return true;

}

}

return false;

}

public int getManualDeleteFileSeveralTimes() {

return manualDeleteFileSeveralTimes;

}

public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {

this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;

}

}

  • CleanCommitLogService的run方法会先执行deleteExpiredFiles,然后执行redeleteHangedFile;deleteExpiredFiles方法在isTimeToDelete、isSpaceToDelete、manualDelete三个条件任意一个成立的条件下会执行commitLog.deleteExpiredFile方法
  • redeleteHangedFile方法在(currentTimestamp - this.lastRedeleteTimestamp) > interval条件成立的情况下会执行commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)
  • isTimeToDelete方法是通过UtilAll.isItTimeToDo(when)判断;isSpaceToDelete则通过UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)计算physicRatio,在physicRatio > diskSpaceWarningLevelRatio时执行runningFlags.getAndMakeDiskFull()改变一下runningFlags状态;若UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics)大于diskSpaceWarningLevelRatio也会执行runningFlags.getAndMakeDiskFull()改变一下runningFlags状态;二者在发现低于执行ratio时会执行runningFlags.getAndMakeDiskOK()

DefaultMessageStore

ocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

public class DefaultMessageStore implements MessageStore {

//......

private void addScheduleTask() {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

DefaultMessageStore.this.cleanFilesPeriodically();

}

}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

DefaultMessageStore.this.checkSelf();

}

}, 1, 10, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {

try {

if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {

long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();

if (lockTime > 1000 && lockTime < 10000000) {

String stack = UtilAll.jstack();

final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"

+ DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;

MixAll.string2FileNotSafe(stack, fileName);

}

}

} catch (Exception e) {

}

}

}

}, 1, 1, TimeUnit.SECONDS);

// this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

// @Override

// public void run() {

// DefaultMessageStore.this.cleanExpiredConsumerQueue();

// }

// }, 1, 1, TimeUnit.HOURS);

}

private void cleanFilesPeriodically() {

this.cleanCommitLogService.run();

this.cleanConsumeQueueService.run();

}

//......

}

  • addScheduleTask方法会注册一个定时任务,每隔messageStoreConfig.getCleanResourceInterval()毫秒执行一次cleanFilesPeriodically()方法;而cleanFilesPeriodically方法则会执行cleanCommitLogService.run()方法和cleanConsumeQueueService.run()方法

小结

CleanCommitLogService的run方法会先执行deleteExpiredFiles,然后执行redeleteHangedFile;deleteExpiredFiles方法在isTimeToDelete、isSpaceToDelete、manualDelete三个条件任意一个成立的条件下会执行commitLog.deleteExpiredFile方法

doc

  • DefaultMessageStore

以上是 聊聊rocketmq的CleanCommitLogService 的全部内容, 来源链接: utcz.com/z/511841.html

回到顶部