聊聊chronos的DeleteBgWorker

编程

本文主要研究一下chronos的DeleteBgWorker

DeleteBgWorker

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/workers/DeleteBgWorker.java

public class DeleteBgWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(DeleteBgWorker.class);

private static final DeleteConfig DELETE_CONFIG = ConfigManager.getConfig().getDeleteConfig();

private static final int SAVE_HOURS_OF_DATA = DELETE_CONFIG.getSaveHours();

private static final long INITIAL_DELAY_MINUTES = 1; // 1 分钟

private static final long PERIOD_MINUTES = 10; // 10 分钟

private static volatile DeleteBgWorker instance = null;

private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**

* 2017/10/13 00:00:00

*/

private static final long MIN_TIMESTAMP = 1507824000;

private static final ScheduledExecutorService SCHEDULE = new ScheduledThreadPoolExecutor(1,

new BasicThreadFactory.Builder().namingPattern("delete-bg-worker-schedule-%d").daemon(true).build());

private DeleteBgWorker() {

}

public void start() {

SCHEDULE.scheduleAtFixedRate(() -> {

byte[] beginKey = String.valueOf(MIN_TIMESTAMP).getBytes(Charsets.UTF_8);

final long seekTimestampInSecond = MetaService.getSeekTimestamp();

byte[] endKey = String.valueOf(seekTimestampInSecond - SAVE_HOURS_OF_DATA * 60 * 60).getBytes(Charsets.UTF_8);

deleteRange(beginKey, endKey);

}, INITIAL_DELAY_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);

LOGGER.info("DeleteBgWorker has started, initialDelayInMinutes:{}", INITIAL_DELAY_MINUTES);

}

private void deleteRange(final byte[] beginKey, final byte[] endKey) {

LOGGER.info("deleteRange start, beginKey:{}, endKey:{}", new String(beginKey), new String(endKey));

final long start = System.currentTimeMillis();

RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey);

LOGGER.info("deleteRange end, beginKey:{}({}), endKey:{}({}), cost:{}ms",

new String(beginKey), formatter.format(Long.parseLong(new String(beginKey)) * 1000),

new String(endKey), formatter.format(Long.parseLong(new String(endKey)) * 1000),

System.currentTimeMillis() - start);

}

public void stop() {

SCHEDULE.shutdownNow();

while (!SCHEDULE.isShutdown()) {

LOGGER.info("DeleteBgWorker is shutting down!");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

LOGGER.info("DeleteBgWorker was forced to shutdown, err:{}", e.getMessage(), e);

}

}

LOGGER.info("DeleteBgWorker was shutdown!");

}

public static DeleteBgWorker getInstance() {

if (instance == null) {

synchronized (DeleteBgWorker.class) {

if (instance == null) {

instance = new DeleteBgWorker();

}

}

}

return instance;

}

}

  • DeleteBgWorker提供了静态方法getInstance来获取或创建单例,该类提供了start、stop两个方法;start方法会往SCHEDULE注册一个调度任务,每隔PERIOD_MINUTES执行一次,它主要执行deleteRange方法;deleteRange主要是执行RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey),它会从MetaService获取seekTimestamp来计算endKey;stop方法则是关闭SCHEDULE

deleteFilesInRange

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java

public class RDB {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class);

static RocksDB DB;

//......

public static boolean deleteFilesInRange(final ColumnFamilyHandle cfh, final byte[] beginKey,

final byte[] endKey) {

try {

DB.deleteRange(cfh, beginKey, endKey);

LOGGER.debug("succ delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}",

cfh.toString(), new String(beginKey), new String(endKey));

} catch (RocksDBException e) {

LOGGER.error("error while delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}, err:{}",

cfh.toString(), new String(beginKey), new String(endKey), e.getMessage(), e);

return false;

}

return true;

}

//......

}

  • deleteFilesInRange方法主要是执行DB.deleteRange(cfh, beginKey, endKey)

MetaService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MetaService.java

public class MetaService {

private static final Logger LOGGER = LoggerFactory.getLogger(MetaService.class);

private static volatile long seekTimestamp = -1;

private static volatile long zkSeekTimestamp = -1;

private static volatile Map<String, Long> zkQidOffsets = new ConcurrentHashMap<>();

private static final DbConfig dbConfig = ConfigManager.getConfig().getDbConfig();

private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,

new BasicThreadFactory.Builder().namingPattern("offset-seektimestamp-schedule-%d").daemon(true).build());

public static void load() {

final long start = System.currentTimeMillis();

if (seekTimestamp == -1) {

seekTimestamp = loadSeekTimestampFromFile();

}

final long cost = System.currentTimeMillis() - start;

LOGGER.info("succ load seekTimestamp, seekTimestamp:{}, cost:{}ms", seekTimestamp, cost);

SCHEDULER.scheduleWithFixedDelay(() -> {

// 如果是master则拉取并上报zk offset和seekOffset

if (MasterElection.isMaster()) {

MqConsumeStatService.getInstance().uploadOffsetsToZk();

uploadSeekTimestampToZk();

}

}, 5, 1, TimeUnit.SECONDS);

}

private static long loadSeekTimestampFromFile() {

String seekTimestampStr = FileIOUtils.readFile2String(dbConfig.getSeekTimestampPath());

if (StringUtils.isBlank(seekTimestampStr)) {

final long initSeekTimestamp = TsUtils.genTS();

boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(initSeekTimestamp));

if (result) {

LOGGER.info("init seekTimestamp and succ save, current seekTimestamp:{}", initSeekTimestamp);

} else {

LOGGER.error("init seekTimestamp and fail to save, current seekTimestamp:{}", initSeekTimestamp);

}

return initSeekTimestamp;

}

LOGGER.info("succ load seekTimestamp from file, seekTimestamp:{}", Long.parseLong(seekTimestampStr));

return Long.parseLong(seekTimestampStr);

}

public static long getSeekTimestamp() {

return seekTimestamp;

}

/**

* 此处的lock不能去掉

* 判断消息超时时

*/

public static void nextSeekTimestamp() {

Batcher.lock.lock();

try {

seekTimestamp++;

boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(seekTimestamp));

if (result) {

LOGGER.info("incr seekTimestamp and succ save, next seekTimestamp:{}", seekTimestamp);

} else {

LOGGER.error("incr seekTimestamp and fail to save, next seekTimestamp:{}", seekTimestamp);

}

} finally {

Batcher.lock.unlock();

}

}

public static void uploadSeekTimestampToZk() {

String seekTimestampStr = String.valueOf(MetaService.getSeekTimestamp());

ZkUtils.createOrUpdateValue(Constants.SEEK_TIMESTAMP_ZK_PATH, seekTimestampStr);

LOGGER.debug("upload seekTimestamp to zk, seekTimestamp:{}", seekTimestampStr);

}

public static Map<String, Long> getZkQidOffsets() {

return zkQidOffsets;

}

public static void setZkQidOffsets(Map<String, Long> zkQidOffsets) {

MetaService.zkQidOffsets = zkQidOffsets;

}

public static long getZkSeekTimestamp() {

return zkSeekTimestamp;

}

public static void setZkSeekTimestamp(long zkSeekTimestamp) {

MetaService.zkSeekTimestamp = zkSeekTimestamp;

}

}

  • MetaService提供了load、getSeekTimestamp、nextSeekTimestamp、uploadSeekTimestampToZk等方法;load方法在seekTimestamp为-1时执行loadSeekTimestampFromFile,之后注册一个定时任务每隔1秒,判断是否是master,如果是则执行MqConsumeStatService.getInstance().uploadOffsetsToZk()以及uploadSeekTimestampToZk;nextSeekTimestamp方法会先更新内存的seekTimestamp,然后使用FileIOUtils.writeFileFromString将其值写入文件

小结

DeleteBgWorker提供了静态方法getInstance来获取或创建单例,该类提供了start、stop两个方法;start方法会往SCHEDULE注册一个调度任务,每隔PERIOD_MINUTES执行一次,它主要执行deleteRange方法;deleteRange主要是执行RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey),它会从MetaService获取seekTimestamp来计算endKey;stop方法则是关闭SCHEDULE

doc

  • carrera-chronos

以上是 聊聊chronos的DeleteBgWorker 的全部内容, 来源链接: utcz.com/z/512478.html

回到顶部