聊聊chronos的MasterElection

编程

本文主要研究一下chronos的MasterElection

MasterElection

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ha/MasterElection.java

public class MasterElection {

private static final Logger SWITCH_LOGGER = LogUtils.SWITCH_LOGGER;

private static volatile ServerState state = ServerState.BACKUPING;

public static void election(final CountDownLatch cdl) {

final CuratorFramework client = ZkUtils.getCuratorClient();

final LeaderSelector selector = new LeaderSelector(client, Constants.MASTER_PATH, new LeaderSelectorListenerAdapter() {

@Override

public void takeLeadership(CuratorFramework curatorFramework) throws Exception {

SWITCH_LOGGER.info("take master leadership");

long seekTimestamp = MetaService.getSeekTimestamp();

long zkSeekTimestamp = MetaService.getZkSeekTimestamp();

final long sleepMs = 200;

long sleepCount = 0;

// 如果zk上的数据丢失了, 则zkSeekTimestamp为0, 此时chronos则被block住

while (seekTimestamp < zkSeekTimestamp && zkSeekTimestamp > 0) {

SWITCH_LOGGER.info("sleep {}ms to wait seekTimestamp:{} to catch up with zkSeekTimestamp:{}",

sleepMs, seekTimestamp, zkSeekTimestamp);

TimeUnit.MILLISECONDS.sleep(sleepMs);

seekTimestamp = MetaService.getSeekTimestamp();

zkSeekTimestamp = MetaService.getZkSeekTimestamp();

sleepCount++;

}

state = ServerState.MASTERING;

SWITCH_LOGGER.info("change server state to {}, totalSleepMs:{}ms", state, sleepCount * sleepMs);

cdl.await();

state = ServerState.BACKUPING;

SWITCH_LOGGER.info("release master leadership");

}

});

selector.autoRequeue();

selector.start();

}

public static boolean isMaster() {

return state == ServerState.MASTERING;

}

public static boolean isBackup() {

return state == ServerState.BACKUPING;

}

public static void standAlone() {

state = ServerState.MASTERING;

}

public static ServerState getState() {

return state;

}

}

  • MasterElection提供了election、isMaster、isBackup、standAlone、getState方法;其中election方法使用的是curator recipes的LeaderSelector,其LeaderSelectorListenerAdapter的takeLeadership方法会先获取seekTimestamp、zkSeekTimestamp,使用while循环直到seekTimestamp大于等于zkSeekTimestamp,之后更新state为ServerState.MASTERING,然后调用CountDownLatch的await方法,之后就是更新state为ServerState.BACKUPING,释放leadership;创建LeaderSelector之后调用其autoRequeue及start方法

ChronosStartup

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ChronosStartup.java

public class ChronosStartup {

private static final Logger LOGGER = LoggerFactory.getLogger(ChronosStartup.class);

private CountDownLatch waitForShutdown;

private String configFilePath = "chronos.yaml";

private PullWorker pullWorker;

private PushWorker pushWorker;

private DeleteBgWorker deleteBgWorker;

private NettyHttpServer nettyHttpServer;

ChronosStartup(final String configFilePath) {

if (StringUtils.isNotBlank(configFilePath)) {

this.configFilePath = configFilePath;

}

}

public void start() throws Exception {

LOGGER.info("start to launch chronos...");

final long start = System.currentTimeMillis();

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

try {

LOGGER.info("start to stop chronos...");

final long start = System.currentTimeMillis();

ChronosStartup.this.stop();

final long cost = System.currentTimeMillis() - start;

LOGGER.info("succ stop chronos, cost:{}ms", cost);

} catch (Exception e) {

LOGGER.error("error while shutdown chronos, err:{}", e.getMessage(), e);

} finally {

/* shutdown log4j2 */

LogManager.shutdown();

}

}

});

/* 注意: 以下初始化顺序有先后次序 */

/* init config */

ConfigManager.initConfig(configFilePath);

/* init metrics */

if (!MetricService.init()) {

System.exit(-1);

}

/* init rocksdb */

RDB.init(ConfigManager.getConfig().getDbConfig().getDbPath());

/* init zk */

ZkUtils.init();

/* init seektimestamp */

MetaService.load();

waitForShutdown = new CountDownLatch(1);

if (ConfigManager.getConfig().isStandAlone()) {

/* standalone */

MasterElection.standAlone();

} else {

/* 集群模式 master election */

MasterElection.election(waitForShutdown);

}

/* init pull worker */

if (ConfigManager.getConfig().isPullOn()) {

pullWorker = PullWorker.getInstance();

pullWorker.start();

}

/* init push worker */

if (ConfigManager.getConfig().isPushOn()) {

pushWorker = PushWorker.getInstance();

pushWorker.start();

}

/* init delete worker */

if (ConfigManager.getConfig().isDeleteOn()) {

deleteBgWorker = DeleteBgWorker.getInstance();

deleteBgWorker.start();

}

final long cost = System.currentTimeMillis() - start;

LOGGER.info("succ start chronos, cost:{}ms", cost);

/* init http server */

nettyHttpServer = NettyHttpServer.getInstance();

nettyHttpServer.start();

waitForShutdown.await();

}

void stop() {

/* shutdown netty http server */

if (nettyHttpServer != null) {

nettyHttpServer.shutdown();

}

/* stop pull from MQ */

if (pullWorker != null) {

pullWorker.stop();

}

/* stop push to MQ */

if (pushWorker != null) {

pushWorker.stop();

}

/* stop delete */

if (deleteBgWorker != null) {

deleteBgWorker.stop();

}

MqConsumeStatService.getInstance().stop();

/* close zk client */

ZkUtils.close();

/* close rocksdb */

RDB.close();

if (waitForShutdown != null) {

waitForShutdown.countDown();

waitForShutdown = null;

}

}

}

  • ChronosStartup的start方法会创建waitForShutdown,对于非standlone模式的执行MasterElection.election(waitForShutdown)

小结

MasterElection提供了election、isMaster、isBackup、standAlone、getState方法;其中election方法使用的是curator recipes的LeaderSelector,其LeaderSelectorListenerAdapter的takeLeadership方法会先获取seekTimestamp、zkSeekTimestamp,使用while循环直到seekTimestamp大于等于zkSeekTimestamp,之后更新state为ServerState.MASTERING,然后调用CountDownLatch的await方法,之后就是更新state为ServerState.BACKUPING,释放leadership;创建LeaderSelector之后调用其autoRequeue及start方法

doc

  • carrera-chronos
  • 聊聊curator recipes的LeaderLatch

以上是 聊聊chronos的MasterElection 的全部内容, 来源链接: utcz.com/z/512519.html

回到顶部