聊聊chronos的addMessage

编程

本文主要研究一下chronos的addMessage

addMessage

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

public class MqPullService implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);

private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();

private static final Batcher BATCHER = Batcher.getInstance();

private volatile boolean shouldStop = false;

private CountDownLatch cdl;

private final List<Long> succOffsets = new ArrayList<>();

private final List<Long> failOffsets = new ArrayList<>();

private SimpleCarreraConsumer carreraConsumer;

private String mqPullServiceName;

private final int INTERNAL_PAIR_COUNT = 5000;

private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);

//......

private void addMessage(final InternalKey internalKey, final InternalValue internalValue, final int action) {

if (internalKey.getType() == MsgTypes.DELAY.getValue()) {

MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY);

if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) {

MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.DB);

return;

}

MetricService.putMsgSizePercent(internalValue.getTopic(), internalValue.toJsonString().getBytes(Charsets.UTF_8).length);

MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.SEND);

putToBlockingQueue(internalKey, internalValue);

} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()

|| internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {

MetricMsgType msgType = MetricMsgType.LOOP_DELAY;

if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {

msgType = MetricMsgType.LOOP_EXPONENT_DELAY;

}

MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType);

while (true) {

if (KeyUtils.isInvalidMsg(internalKey)) {

return;

}

// 循环消息只写入rocksdb一次, seek到的时候再进行添加

if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) {

MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.DB);

return;

}

MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.SEND);

putToBlockingQueue(new InternalKey(internalKey), internalValue);

internalKey.nextUniqDelayMsgId();

}

} else {

MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.UNKNOWN);

LOGGER.error("should not go here, invalid message type:{}, internalKey:{}", internalKey.getType(),

internalKey.genUniqDelayMsgId());

}

}

private void putToBlockingQueue(InternalKey internalKey, InternalValue internalValue) {

try {

blockingQueue.put(new InternalPair(internalKey, internalValue));

} catch (InterruptedException e) {

LOGGER.error("error while put to blockingQueue, dMsgId:{}", internalKey.genUniqDelayMsgId());

}

}

//......

}

  • MqPullService实现了Runnable接口,其addMessage方法执行BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

public class InternalKey {

private static final String SEPARATOR = "-";

private static final int LEN_UUID = 36;

private static final long ONE_DAY_SECONDS = 24 * 60 * 60;

private long timestamp;

private int type;

private long expire;

private long times;

private long timed;

private long interval;

private int innerTopicSeq;

private String uuid;

private int segmentNum;

private int segmentIndex;

//......

}

  • InternalKey定义了timestamp、times、interval等属性

InternalValue

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/InternalValue.java

public class InternalValue {

@JSONField(name="a")

private String topic;

@JSONField(name="b")

private String body;

@JSONField(name="c")

private long createTime;

@JSONField(name="d")

private String tags;

@JSONField(name="e")

private Map<String, String> properties;

//......

}

  • InternalValue定义了topic、body、createTime、tags、properties属性

checkAndPutToDefaultCF

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

public class Batcher {

private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);

private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();

private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();

private WriteBatch wb = new WriteBatch();

private volatile int itemNum = 0;

private static volatile Batcher instance = null;

public static volatile ReentrantLock lock = new ReentrantLock();

//......

public boolean checkAndPutToDefaultCF(final InternalKey internalKey, final String strVal, final String topic, final int action) {

lock.lock();

try {

if (KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {

byte[] bytes = strVal.getBytes(Charsets.UTF_8);

MetricService.putMsgSizePercent(topic, bytes.length);

if (bytes.length <= MSG_BYTE_BASE_LEN) {

putToDefaultCF(internalKey.genUniqDelayMsgId(), bytes, topic, internalKey, action);

} else {

// 如果字节数据超过一定长度, 则进行字节数组切分, 以便降低io.util

List<byte[]> list = ByteUtils.divideArray(bytes, MSG_BYTE_BASE_LEN);

final int segmentNum = list.size();

for (int segmentIndex = 0; segmentIndex < segmentNum; segmentIndex++) {

internalKey.setSegmentNum(segmentNum);

internalKey.setSegmentIndex(Constants.SEGMENT_INDEX_BASE + segmentIndex);

putToDefaultCF(internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex), topic, internalKey, action);

LOGGER.info("segment split, dMsgId:{}, len:{}, value.totalLen:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex).length, bytes.length);

}

}

return true;

}

return false;

} finally {

lock.unlock();

}

}

public void putToDefaultCF(final String key, final byte[] value, final String topic, final InternalKey internalKey, final int action) {

put(CFManager.CFH_DEFAULT, key.getBytes(Charsets.UTF_8), value, topic, internalKey, action);

}

private void put(final ColumnFamilyHandle cfh, final byte[] key, final byte[] value, final String topic, final InternalKey internalKey, final int action) {

lock.lock();

try {

int len = 0;

if (value != null) {

len = value.length;

}

wb.put(cfh, key, value);

LOGGER.info("put to cf, dMsgId:{}, len:{}", new String(key), len);

itemNum++;

checkFrequency();

if (action == Actions.ADD.getValue()) {

if (internalKey.getType() == MsgTypes.DELAY.getValue()) {

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.DELAY);

} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_DELAY);

} else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_EXPONENT_DELAY);

}

} else {

if (internalKey.getType() == MsgTypes.DELAY.getValue()) {

MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY, MetricMsgToOrFrom.DB);

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);

} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {

MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY, MetricMsgToOrFrom.DB);

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);

} else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {

MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY, MetricMsgToOrFrom.DB);

MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);

}

}

} finally {

lock.unlock();

}

}

private void checkFrequency() {

if (itemNum >= PULL_BATCH_ITEM_NUM) {

flush();

}

}

public void flush() {

lock.lock();

try {

if (itemNum > 0) {

// make sure write succ

while (!RDB.writeSync(wb)) {

LOGGER.error("error while flush to db!");

try {

TimeUnit.MILLISECONDS.sleep(200);

} catch (InterruptedException e) {

}

}

wb.clear();

itemNum = 0;

}

} finally {

lock.unlock();

}

}

//......

}

  • Batcher的checkAndPutToDefaultCF主要是执行putToDefaultCF,而putToDefaultCF主要是执行put方法,该方法会执行wb.put(cfh, key, value),将数据写入到rocksdb的WriteBatch;之后执行checkFrequency在必要的时候进行flush;flush方法主要是执行RDB.writeSync(wb)以及wb.clear()

writeSync

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java

public class RDB {

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class);

static RocksDB DB;

public static void init(final String dbPath) {

try {

final long start = System.currentTimeMillis();

boolean result = FileIOUtils.createOrExistsDir(new File(dbPath));

assert(result != false);

result = FileIOUtils.createOrExistsDir(new File(DB_PATH_BACKUP));

assert(result != false);

result = FileIOUtils.createOrExistsDir(new File(DB_PATH_RESTORE));

assert(result != false);

DB = RocksDB.open(OptionsConfig.DB_OPTIONS, dbPath, CF_DESCRIPTORS, CF_HANDLES);

assert (DB != null);

initCFManger(CF_HANDLES);

final long cost = System.currentTimeMillis() - start;

LOGGER.info("succ open rocksdb, path:{}, cost:{}ms", dbPath, cost);

} catch (RocksDBException e) {

LOGGER.error("error while open rocksdb, path:{}, err:{}", dbPath, e.getMessage(), e);

}

}

//......

public static boolean writeSync(final WriteBatch writeBatch) {

return write(OptionsConfig.WRITE_OPTIONS_SYNC, writeBatch);

}

private static boolean write(final WriteOptions writeOptions, final WriteBatch writeBatch) {

try {

DB.write(writeOptions, writeBatch);

LOGGER.debug("succ write writeBatch, size:{}", writeBatch.count());

} catch (RocksDBException e) {

// TODO: 2017/11/8 上报写入失败

LOGGER.error("error while write batch, err:{}", e.getMessage(), e);

return false;

}

return true;

}

//......

}

  • writeSync方法主要是使用OptionsConfig.WRITE_OPTIONS_SYNC参数执行RocksDB.write方法

小结

MqPullService实现了Runnable接口,其addMessage方法执行BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action);Batcher的checkAndPutToDefaultCF主要是执行putToDefaultCF,而putToDefaultCF主要是执行put方法,该方法会执行wb.put(cfh, key, value),将数据写入到rocksdb的WriteBatch;之后执行checkFrequency在必要的时候进行flush;flush方法主要是执行RDB.writeSync(wb)以及wb.clear()

doc

  • carrera-chronos

以上是 聊聊chronos的addMessage 的全部内容, 来源链接: utcz.com/z/512308.html

回到顶部