聊聊rocketmq的SlaveSynchronize

序
本文主要研究一下rocketmq的SlaveSynchronize
BrokerController
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
public class BrokerController {	//......
    private void handleSlaveSynchronize(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
    }
    //......
}
- BrokerController有个handleSlaveSynchronize方法,在role为BrokerRole.SLAVE的时候,会注册一个定时任务,每隔10秒钟执行一次BrokerController.this.slaveSynchronize.syncAll()
 
SlaveSynchronize
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;
    private volatile String masterAddr = null;
    public SlaveSynchronize(BrokerController brokerController) {
        this.brokerController = brokerController;
    }
    public String getMasterAddr() {
        return masterAddr;
    }
    public void setMasterAddr(String masterAddr) {
        this.masterAddr = masterAddr;
    }
    public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
    }
    //......
}
- SlaveSynchronize的syncAll方法分别调用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法
 
syncTopicConfig
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {	//......
    private void syncTopicConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {
                    this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                    this.brokerController.getTopicConfigManager().persist();
                    log.info("Update slave topic config from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
            }
        }
    }
	//......
}
- syncTopicConfig方法从this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak)方法获取TopicConfigSerializeWrapper,之后判断其dataVersion是否与this.brokerController.getTopicConfigManager().getDataVersion()相同,不同的话则使用wrapper的数据更新brokerController.getTopicConfigManager(),然后持久化
 
syncConsumerOffset
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {	//......
    private void syncConsumerOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
                this.brokerController.getConsumerOffsetManager().persist();
                log.info("Update slave consumer offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
            }
        }
    }
	//......
}
- syncConsumerOffset方法从this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak)获取ConsumerOffsetSerializeWrapper,之后用其数据更新brokerController.getConsumerOffsetManager().getOffsetTable()并持久化
 
syncDelayOffset
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {	//......
    private void syncDelayOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                String delayOffset =
                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                if (delayOffset != null) {
                    String fileName =
                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                            .getMessageStoreConfig().getStorePathRootDir());
                    try {
                        MixAll.string2File(delayOffset, fileName);
                    } catch (IOException e) {
                        log.error("Persist file Exception, {}", fileName, e);
                    }
                }
                log.info("Update slave delay offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
            }
        }
    }
	//......
}
- syncDelayOffset方法从this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak)获取delayOffset,然后使用MixAll.string2File(delayOffset, fileName)持久化到文件
 
syncSubscriptionGroupConfig
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
public class SlaveSynchronize {	//......
    private void syncSubscriptionGroupConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                SubscriptionGroupWrapper subscriptionWrapper =
                    this.brokerController.getBrokerOuterAPI()
                        .getAllSubscriptionGroupConfig(masterAddrBak);
                if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                    .equals(subscriptionWrapper.getDataVersion())) {
                    SubscriptionGroupManager subscriptionGroupManager =
                        this.brokerController.getSubscriptionGroupManager();
                    subscriptionGroupManager.getDataVersion().assignNewOne(
                        subscriptionWrapper.getDataVersion());
                    subscriptionGroupManager.getSubscriptionGroupTable().clear();
                    subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                        subscriptionWrapper.getSubscriptionGroupTable());
                    subscriptionGroupManager.persist();
                    log.info("Update slave Subscription Group from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
            }
        }
    }
	//......
}
- syncSubscriptionGroupConfig方法从this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak)获取SubscriptionGroupWrapper,之后判断其dataVersion是否与this.brokerController.getSubscriptionGroupManager().getDataVersion()相同,不同的话则使用wrapper的数据更新subscriptionGroupManager.getSubscriptionGroupTable(),然后持久化
 
小结
BrokerController有个handleSlaveSynchronize方法,在role为BrokerRole.SLAVE的时候,会注册一个定时任务,每隔10秒钟执行一次BrokerController.this.slaveSynchronize.syncAll();SlaveSynchronize的syncAll方法分别调用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法
doc
- SlaveSynchronize
 
以上是 聊聊rocketmq的SlaveSynchronize 的全部内容, 来源链接: utcz.com/z/512221.html


