聊聊chronos的cancelMessage

编程

本文主要研究一下chronos的cancelMessage

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

public class MqPullService implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);

private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();

private static final Batcher BATCHER = Batcher.getInstance();

private volatile boolean shouldStop = false;

private CountDownLatch cdl;

private final List<Long> succOffsets = new ArrayList<>();

private final List<Long> failOffsets = new ArrayList<>();

private SimpleCarreraConsumer carreraConsumer;

private String mqPullServiceName;

private final int INTERNAL_PAIR_COUNT = 5000;

private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);

//......

private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {

InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();

if (internalKey.getType() == MsgTypes.DELAY.getValue()) {

MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);

BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),

new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);

} else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {

MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);

BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);

} else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {

MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);

BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);

} else {

MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);

LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),

internalKey.genUniqDelayMsgId());

}

}

//......

}

  • cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

public class InternalKey {

private static final String SEPARATOR = "-";

private static final int LEN_UUID = 36;

private static final long ONE_DAY_SECONDS = 24 * 60 * 60;

private long timestamp;

private int type;

private long expire;

private long times;

private long timed;

private long interval;

private int innerTopicSeq;

private String uuid;

private int segmentNum;

private int segmentIndex;

//......

public InternalKey cloneTombstoneInternalKey() {

InternalKey tombstoneInternalKey = new InternalKey(this);

tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());

return tombstoneInternalKey;

}

//......

}

  • cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

public class CancelWrap {

private String uniqDelayMsgId;

private String topic;

public CancelWrap() {

}

public CancelWrap(String uniqDelayMsgId, String topic) {

this.uniqDelayMsgId = uniqDelayMsgId;

this.topic = topic;

}

public String getUniqDelayMsgId() {

return uniqDelayMsgId;

}

public void setUniqDelayMsgId(String uniqDelayMsgId) {

this.uniqDelayMsgId = uniqDelayMsgId;

}

public String getTopic() {

return topic;

}

public void setTopic(String topic) {

this.topic = topic;

}

public String toJsonString() {

return JsonUtils.toJsonString(this);

}

@Override

public String toString() {

return "CancelWrap{" +

"uniqDelayMsgId="" + uniqDelayMsgId + """ +

", topic="" + topic + """ +

"}";

}

}

  • CancelWrap定义了uniqDelayMsgId及topic两个属性

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

public class Batcher {

private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);

private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();

private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();

private WriteBatch wb = new WriteBatch();

private volatile int itemNum = 0;

private static volatile Batcher instance = null;

public static volatile ReentrantLock lock = new ReentrantLock();

//......

public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {

lock.lock();

try {

// 指数循环

// 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5

// 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5

// 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5

// 普通循环

// 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5

while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {

internalKey = internalKey.nextUniqDelayMsgId();

}

tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());

tombstoneInternalKey.setTimes(internalKey.getTimed() + 2);

tombstoneInternalKey.setTimed(internalKey.getTimed());

if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {

putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),

new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);

}

} finally {

lock.unlock();

}

}

//......

}

  • putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录

小结

cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

doc

  • carrera-chronos

以上是 聊聊chronos的cancelMessage 的全部内容, 来源链接: utcz.com/z/512437.html

回到顶部