聊聊rocketmq的DLedgerRoleChangeHandler

编程

本文主要研究一下rocketmq的DLedgerRoleChangeHandler

DLedgerRoleChangeHandler

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java

public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));

private BrokerController brokerController;

private DefaultMessageStore messageStore;

private DLedgerCommitLog dLedgerCommitLog;

private DLedgerServer dLegerServer;

public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {

this.brokerController = brokerController;

this.messageStore = messageStore;

this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();

this.dLegerServer = dLedgerCommitLog.getdLedgerServer();

}

@Override public void handle(long term, MemberState.Role role) {

Runnable runnable = new Runnable() {

@Override public void run() {

long start = System.currentTimeMillis();

try {

boolean succ = true;

log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());

switch (role) {

case CANDIDATE:

if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {

brokerController.changeToSlave(dLedgerCommitLog.getId());

}

break;

case FOLLOWER:

brokerController.changeToSlave(dLedgerCommitLog.getId());

break;

case LEADER:

while (true) {

if (!dLegerServer.getMemberState().isLeader()) {

succ = false;

break;

}

if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {

break;

}

if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()

&& messageStore.dispatchBehindBytes() == 0) {

break;

}

Thread.sleep(100);

}

if (succ) {

messageStore.recoverTopicQueueTable();

brokerController.changeToMaster(BrokerRole.SYNC_MASTER);

}

break;

default:

break;

}

log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));

} catch (Throwable t) {

log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);

}

}

};

executorService.submit(runnable);

}

@Override public void startup() {

}

@Override public void shutdown() {

executorService.shutdown();

}

}

  • DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

changeToSlave

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {

//......

public void changeToSlave(int brokerId) {

log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);

//change the role

brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check

messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

//handle the scheduled service

try {

this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);

} catch (Throwable t) {

log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);

}

//handle the transactional service

try {

this.shutdownProcessorByHa();

} catch (Throwable t) {

log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);

}

//handle the slave synchronise

handleSlaveSynchronize(BrokerRole.SLAVE);

try {

this.registerBrokerAll(true, true, brokerConfig.isForceRegister());

} catch (Throwable ignored) {

}

log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);

}

//......

}

  • changeToSlave方法主要执行messageStoreConfig.setBrokerRole(BrokerRole.SLAVE),然后执行shutdownProcessorByHa()、handleSlaveSynchronize(BrokerRole.SLAVE)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

changeToMaster

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {

//......

public void changeToMaster(BrokerRole role) {

if (role == BrokerRole.SLAVE) {

return;

}

log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());

//handle the slave synchronise

handleSlaveSynchronize(role);

//handle the scheduled service

try {

this.messageStore.handleScheduleMessageService(role);

} catch (Throwable t) {

log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);

}

//handle the transactional service

try {

this.startProcessorByHa(BrokerRole.SYNC_MASTER);

} catch (Throwable t) {

log.error("[MONITOR] startProcessorByHa failed when changing to master", t);

}

//if the operations above are totally successful, we change to master

brokerConfig.setBrokerId(0); //TO DO check

messageStoreConfig.setBrokerRole(role);

try {

this.registerBrokerAll(true, true, brokerConfig.isForceRegister());

} catch (Throwable ignored) {

}

log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());

}

//......

}

  • changeToMaster方法执行handleSlaveSynchronize、startProcessorByHa(BrokerRole.SYNC_MASTER)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

小结

DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

doc

  • DLedgerRoleChangeHandler

以上是 聊聊rocketmq的DLedgerRoleChangeHandler 的全部内容, 来源链接: utcz.com/z/512279.html

回到顶部