聊聊rocketmq的ExpressionForRetryMessageFilter

编程

本文主要研究一下rocketmq的ExpressionForRetryMessageFilter

MessageFilter

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java

public interface MessageFilter {

/**

* match by tags code or filter bit map which is calculated when message received

* and stored in consume queue ext.

*

* @param tagsCode tagsCode

* @param cqExtUnit extend unit of consume queue

*/

boolean isMatchedByConsumeQueue(final Long tagsCode,

final ConsumeQueueExt.CqExtUnit cqExtUnit);

/**

* match by message content which are stored in commit log.

* <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store,

* {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null.

*

* @param msgBuffer message buffer in commit log, may be null if not invoked in store.

* @param properties message properties, should decode from buffer if null by yourself.

*/

boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,

final Map<String, String> properties);

}

  • MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法

ExpressionMessageFilter

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java

public class ExpressionMessageFilter implements MessageFilter {

protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);

protected final SubscriptionData subscriptionData;

protected final ConsumerFilterData consumerFilterData;

protected final ConsumerFilterManager consumerFilterManager;

protected final boolean bloomDataValid;

public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,

ConsumerFilterManager consumerFilterManager) {

this.subscriptionData = subscriptionData;

this.consumerFilterData = consumerFilterData;

this.consumerFilterManager = consumerFilterManager;

if (consumerFilterData == null) {

bloomDataValid = false;

return;

}

BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();

if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {

bloomDataValid = true;

} else {

bloomDataValid = false;

}

}

@Override

public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {

if (null == subscriptionData) {

return true;

}

if (subscriptionData.isClassFilterMode()) {

return true;

}

// by tags code.

if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

if (tagsCode == null) {

return true;

}

if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {

return true;

}

return subscriptionData.getCodeSet().contains(tagsCode.intValue());

} else {

// no expression or no bloom

if (consumerFilterData == null || consumerFilterData.getExpression() == null

|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {

return true;

}

// message is before consumer

if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {

log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);

return true;

}

byte[] filterBitMap = cqExtUnit.getFilterBitMap();

BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();

if (filterBitMap == null || !this.bloomDataValid

|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {

return true;

}

BitsArray bitsArray = null;

try {

bitsArray = BitsArray.create(filterBitMap);

boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);

log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);

return ret;

} catch (Throwable e) {

log.error("bloom filter error, sub=" + subscriptionData

+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);

}

}

return true;

}

@Override

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {

if (subscriptionData == null) {

return true;

}

if (subscriptionData.isClassFilterMode()) {

return true;

}

if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

return true;

}

ConsumerFilterData realFilterData = this.consumerFilterData;

Map<String, String> tempProperties = properties;

// no expression

if (realFilterData == null || realFilterData.getExpression() == null

|| realFilterData.getCompiledExpression() == null) {

return true;

}

if (tempProperties == null && msgBuffer != null) {

tempProperties = MessageDecoder.decodeProperties(msgBuffer);

}

Object ret = null;

try {

MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

ret = realFilterData.getCompiledExpression().evaluate(context);

} catch (Throwable e) {

log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);

}

log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

if (ret == null || !(ret instanceof Boolean)) {

return false;

}

return (Boolean) ret;

}

}

  • ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果

ExpressionForRetryMessageFilter

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java

public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {

public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,

ConsumerFilterManager consumerFilterManager) {

super(subscriptionData, consumerFilterData, consumerFilterManager);

}

@Override

public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {

if (subscriptionData == null) {

return true;

}

if (subscriptionData.isClassFilterMode()) {

return true;

}

boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);

if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {

return true;

}

ConsumerFilterData realFilterData = this.consumerFilterData;

Map<String, String> tempProperties = properties;

boolean decoded = false;

if (isRetryTopic) {

// retry topic, use original filter data.

// poor performance to support retry filter.

if (tempProperties == null && msgBuffer != null) {

decoded = true;

tempProperties = MessageDecoder.decodeProperties(msgBuffer);

}

String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);

String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());

realFilterData = this.consumerFilterManager.get(realTopic, group);

}

// no expression

if (realFilterData == null || realFilterData.getExpression() == null

|| realFilterData.getCompiledExpression() == null) {

return true;

}

if (!decoded && tempProperties == null && msgBuffer != null) {

tempProperties = MessageDecoder.decodeProperties(msgBuffer);

}

Object ret = null;

try {

MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

ret = realFilterData.getCompiledExpression().evaluate(context);

} catch (Throwable e) {

log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);

}

log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

if (ret == null || !(ret instanceof Boolean)) {

return false;

}

return (Boolean) ret;

}

}

  • ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic;对于retryTopic会使用tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC)来获取realTopic,从而根据consumerFilterManager.get(realTopic, group)获取realFilterData;最后通过realFilterData.getCompiledExpression().evaluate(context)来获取结果

小结

MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法;ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果;ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic

doc

  • ExpressionForRetryMessageFilter

以上是 聊聊rocketmq的ExpressionForRetryMessageFilter 的全部内容, 来源链接: utcz.com/z/512033.html

回到顶部