聊聊rocketmq的SlaveSynchronize

编程

本文主要研究一下rocketmq的SlaveSynchronize

BrokerController

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {

//......

private void handleSlaveSynchronize(BrokerRole role) {

if (role == BrokerRole.SLAVE) {

if (null != slaveSyncFuture) {

slaveSyncFuture.cancel(false);

}

this.slaveSynchronize.setMasterAddr(null);

slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.slaveSynchronize.syncAll();

}

catch (Throwable e) {

log.error("ScheduledTask SlaveSynchronize syncAll error.", e);

}

}

}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

} else {

//handle the slave synchronise

if (null != slaveSyncFuture) {

slaveSyncFuture.cancel(false);

}

this.slaveSynchronize.setMasterAddr(null);

}

}

//......

}

  • BrokerController有个handleSlaveSynchronize方法,在role为BrokerRole.SLAVE的时候,会注册一个定时任务,每隔10秒钟执行一次BrokerController.this.slaveSynchronize.syncAll()

SlaveSynchronize

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {

private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

private final BrokerController brokerController;

private volatile String masterAddr = null;

public SlaveSynchronize(BrokerController brokerController) {

this.brokerController = brokerController;

}

public String getMasterAddr() {

return masterAddr;

}

public void setMasterAddr(String masterAddr) {

this.masterAddr = masterAddr;

}

public void syncAll() {

this.syncTopicConfig();

this.syncConsumerOffset();

this.syncDelayOffset();

this.syncSubscriptionGroupConfig();

}

//......

}

  • SlaveSynchronize的syncAll方法分别调用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法

syncTopicConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {

//......

private void syncTopicConfig() {

String masterAddrBak = this.masterAddr;

if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {

try {

TopicConfigSerializeWrapper topicWrapper =

this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);

if (!this.brokerController.getTopicConfigManager().getDataVersion()

.equals(topicWrapper.getDataVersion())) {

this.brokerController.getTopicConfigManager().getDataVersion()

.assignNewOne(topicWrapper.getDataVersion());

this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();

this.brokerController.getTopicConfigManager().getTopicConfigTable()

.putAll(topicWrapper.getTopicConfigTable());

this.brokerController.getTopicConfigManager().persist();

log.info("Update slave topic config from master, {}", masterAddrBak);

}

} catch (Exception e) {

log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);

}

}

}

//......

}

  • syncTopicConfig方法从this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak)方法获取TopicConfigSerializeWrapper,之后判断其dataVersion是否与this.brokerController.getTopicConfigManager().getDataVersion()相同,不同的话则使用wrapper的数据更新brokerController.getTopicConfigManager(),然后持久化

syncConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {

//......

private void syncConsumerOffset() {

String masterAddrBak = this.masterAddr;

if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {

try {

ConsumerOffsetSerializeWrapper offsetWrapper =

this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);

this.brokerController.getConsumerOffsetManager().getOffsetTable()

.putAll(offsetWrapper.getOffsetTable());

this.brokerController.getConsumerOffsetManager().persist();

log.info("Update slave consumer offset from master, {}", masterAddrBak);

} catch (Exception e) {

log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);

}

}

}

//......

}

  • syncConsumerOffset方法从this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak)获取ConsumerOffsetSerializeWrapper,之后用其数据更新brokerController.getConsumerOffsetManager().getOffsetTable()并持久化

syncDelayOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {

//......

private void syncDelayOffset() {

String masterAddrBak = this.masterAddr;

if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {

try {

String delayOffset =

this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);

if (delayOffset != null) {

String fileName =

StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController

.getMessageStoreConfig().getStorePathRootDir());

try {

MixAll.string2File(delayOffset, fileName);

} catch (IOException e) {

log.error("Persist file Exception, {}", fileName, e);

}

}

log.info("Update slave delay offset from master, {}", masterAddrBak);

} catch (Exception e) {

log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);

}

}

}

//......

}

  • syncDelayOffset方法从this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak)获取delayOffset,然后使用MixAll.string2File(delayOffset, fileName)持久化到文件

syncSubscriptionGroupConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {

//......

private void syncSubscriptionGroupConfig() {

String masterAddrBak = this.masterAddr;

if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {

try {

SubscriptionGroupWrapper subscriptionWrapper =

this.brokerController.getBrokerOuterAPI()

.getAllSubscriptionGroupConfig(masterAddrBak);

if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()

.equals(subscriptionWrapper.getDataVersion())) {

SubscriptionGroupManager subscriptionGroupManager =

this.brokerController.getSubscriptionGroupManager();

subscriptionGroupManager.getDataVersion().assignNewOne(

subscriptionWrapper.getDataVersion());

subscriptionGroupManager.getSubscriptionGroupTable().clear();

subscriptionGroupManager.getSubscriptionGroupTable().putAll(

subscriptionWrapper.getSubscriptionGroupTable());

subscriptionGroupManager.persist();

log.info("Update slave Subscription Group from master, {}", masterAddrBak);

}

} catch (Exception e) {

log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);

}

}

}

//......

}

  • syncSubscriptionGroupConfig方法从this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak)获取SubscriptionGroupWrapper,之后判断其dataVersion是否与this.brokerController.getSubscriptionGroupManager().getDataVersion()相同,不同的话则使用wrapper的数据更新subscriptionGroupManager.getSubscriptionGroupTable(),然后持久化

小结

BrokerController有个handleSlaveSynchronize方法,在role为BrokerRole.SLAVE的时候,会注册一个定时任务,每隔10秒钟执行一次BrokerController.this.slaveSynchronize.syncAll();SlaveSynchronize的syncAll方法分别调用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法

doc

  • SlaveSynchronize

以上是 聊聊rocketmq的SlaveSynchronize 的全部内容, 来源链接: utcz.com/z/512221.html

回到顶部