聊聊artemis的lastValueProperty

编程

CoreMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

public class CoreMessage extends RefCountMessage implements ICoreMessage {

//......

public SimpleString getLastValueProperty() {

return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);

}

@Override

public Message setLastValueProperty(SimpleString lastValueName) {

return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName);

}

//......

}

  • CoreMessage提供了getLastValueProperty、setLastValueProperty方法,设置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)属性

MessageReferenceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java

public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {

//......

public SimpleString getLastValueProperty() {

SimpleString lastValue = message.getSimpleStringProperty(queue.getLastValueKey());

if (lastValue == null) {

lastValue = message.getLastValueProperty();

}

return lastValue;

}

//......

}

  • MessageReferenceImpl提供了getLastValueProperty方法,它先从message获取queue.getLastValueKey()的属性值,如果为null再读取message.getLastValueProperty()

LastValueQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java

public class LastValueQueue extends QueueImpl {

private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();

private final SimpleString lastValueKey;

//......

@Override

public synchronized void addTail(final MessageReference ref, final boolean direct) {

if (scheduleIfPossible(ref)) {

return;

}

final SimpleString prop = ref.getLastValueProperty();

if (prop != null) {

HolderReference hr = map.get(prop);

if (hr != null) {

// We need to overwrite the old ref with the new one and ack the old one

replaceLVQMessage(ref, hr);

} else {

hr = new HolderReference(prop, ref);

map.put(prop, hr);

super.addTail(hr, direct);

}

} else {

super.addTail(ref, direct);

}

}

@Override

public synchronized void addHead(final MessageReference ref, boolean scheduling) {

// we first need to check redelivery-delay, as we can"t put anything on headers if redelivery-delay

if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {

return;

}

SimpleString lastValueProp = ref.getLastValueProperty();

if (lastValueProp != null) {

HolderReference hr = map.get(lastValueProp);

if (hr != null) {

if (scheduling) {

// We need to overwrite the old ref with the new one and ack the old one

replaceLVQMessage(ref, hr);

} else {

// We keep the current ref and ack the one we are returning

super.referenceHandled(ref);

try {

super.acknowledge(ref);

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);

}

}

} else {

hr = new HolderReference(lastValueProp, ref);

map.put(lastValueProp, hr);

super.addHead(hr, scheduling);

}

} else {

super.addHead(ref, scheduling);

}

}

private void replaceLVQMessage(MessageReference ref, HolderReference hr) {

MessageReference oldRef = hr.getReference();

referenceHandled(oldRef);

super.refRemoved(oldRef);

try {

oldRef.acknowledge(null, AckReason.REPLACED, null);

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);

}

hr.setReference(ref);

addRefSize(ref);

refAdded(ref);

}

//......

}

  • LastValueQueue继承了QueueImpl,其addTail或addHead方法先通过ref.getLastValueProperty()找到lastValueProp,在从map获取HolderReference,对于HolderReference不为null的,执行replaceLVQMessage方法;该方法会替换到oldRef并对oldRef进行ack

小结

CoreMessage提供了getLastValueProperty、setLastValueProperty方法,设置的是message的Message.HDR_LAST_VALUE_NAME(_AMQ_LVQ_NAME)属性;MessageReferenceImpl提供了getLastValueProperty方法,它先从message获取queue.getLastValueKey()的属性值,如果为null再读取message.getLastValueProperty();LastValueQueue继承了QueueImpl,其addTail或addHead方法先通过ref.getLastValueProperty()找到lastValueProp,在从map获取HolderReference,对于HolderReference不为null的,执行replaceLVQMessage方法;该方法会替换到oldRef并对oldRef进行ack

doc

  • LastValueQueue

以上是 聊聊artemis的lastValueProperty 的全部内容, 来源链接: utcz.com/z/513262.html

回到顶部