聊聊artemis的maxDeliveryAttempts

编程

maxDeliveryAttempts

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java

public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport {

//......

public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;

private Integer maxDeliveryAttempts = null;

private SimpleString deadLetterAddress = null;

//......

public int getMaxDeliveryAttempts() {

return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;

}

public AddressSettings setMaxDeliveryAttempts(final int maxDeliveryAttempts) {

this.maxDeliveryAttempts = maxDeliveryAttempts;

return this;

}

public SimpleString getDeadLetterAddress() {

return deadLetterAddress;

}

public AddressSettings setDeadLetterAddress(final SimpleString deadLetterAddress) {

this.deadLetterAddress = deadLetterAddress;

return this;

}

//......

}

  • AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10

checkRedelivery

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,

final long timeBase,

final boolean ignoreRedeliveryDelay) throws Exception {

Message message = reference.getMessage();

if (internalQueue) {

if (logger.isTraceEnabled()) {

logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");

}

// no DLQ check on internal queues

return new Pair<>(true, false);

}

if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {

storageManager.updateDeliveryCount(reference);

}

AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());

int maxDeliveries = addressSettings.getMaxDeliveryAttempts();

int deliveryCount = reference.getDeliveryCount();

// First check DLA

if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {

if (logger.isTraceEnabled()) {

logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());

}

boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());

return new Pair<>(false, dlaResult);

} else {

// Second check Redelivery Delay

long redeliveryDelay = addressSettings.getRedeliveryDelay();

if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {

redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);

if (logger.isTraceEnabled()) {

logger.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);

}

reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);

if (!reference.isPaged() && reference.isDurable() && isDurable()) {

storageManager.updateScheduledDeliveryTime(reference);

}

}

decDelivering(reference);

return new Pair<>(true, false);

}

}

//......

}

  • QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress

sendToDeadLetterAddress

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private boolean sendToDeadLetterAddress(final Transaction tx,

final MessageReference ref,

final SimpleString deadLetterAddress) throws Exception {

if (deadLetterAddress != null) {

Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {

ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);

ref.acknowledge(tx, AckReason.KILLED, null);

} else {

ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);

move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);

return true;

}

} else {

ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);

ref.acknowledge(tx, AckReason.KILLED, null);

}

return false;

}

private void move(final Transaction originalTX,

final SimpleString address,

final Binding binding,

final MessageReference ref,

final boolean rejectDuplicate,

final AckReason reason,

final ServerConsumer consumer) throws Exception {

Transaction tx;

if (originalTX != null) {

tx = originalTX;

} else {

// if no TX we create a new one to commit at the end

tx = new TransactionImpl(storageManager);

}

Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);

copyMessage.setAddress(address);

postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);

acknowledge(tx, ref, reason, consumer);

if (originalTX == null) {

tx.commit();

}

}

//......

}

  • sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

incrementDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

//......

public HandleStatus handle(final MessageReference ref) throws Exception {

// available credits can be set back to null with a flow control option.

AtomicInteger checkInteger = availableCredits;

if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {

if (logger.isDebugEnabled()) {

logger.debug(this + " is busy for the lack of credits. Current credits = " +

availableCredits +

" Can"t receive reference " +

ref);

}

return HandleStatus.BUSY;

}

synchronized (lock) {

// If the consumer is stopped then we don"t accept the message, it

// should go back into the

// queue for delivery later.

// TCP-flow control has to be done first than everything else otherwise we may lose notifications

if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {

return HandleStatus.BUSY;

}

// If there is a pendingLargeMessage we can"t take another message

// This has to be checked inside the lock as the set to null is done inside the lock

if (largeMessageDeliverer != null) {

if (logger.isDebugEnabled()) {

logger.debug(this + " is busy delivering large message " +

largeMessageDeliverer +

", can"t deliver reference " +

ref);

}

return HandleStatus.BUSY;

}

final Message message = ref.getMessage();

if (!message.acceptsConsumer(sequentialID())) {

return HandleStatus.NO_MATCH;

}

if (filter != null && !filter.match(message)) {

if (logger.isTraceEnabled()) {

logger.trace("Reference " + ref + " is a noMatch on consumer " + this);

}

return HandleStatus.NO_MATCH;

}

if (logger.isTraceEnabled()) {

logger.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);

}

if (!browseOnly) {

if (!preAcknowledge) {

deliveringRefs.add(ref);

}

ref.handled();

ref.setConsumerId(this.id);

ref.incrementDeliveryCount();

// If updateDeliveries = false (set by strict-update),

// the updateDeliveryCountAfterCancel would still be updated after c

if (strictUpdateDeliveryCount && !ref.isPaged()) {

if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&

!ref.getQueue().isInternalQueue() &&

!ref.isPaged()) {

storageManager.updateDeliveryCount(ref);

}

}

if (preAcknowledge) {

if (message.isLargeMessage()) {

// we must hold one reference, or the file will be deleted before it could be delivered

((LargeServerMessage) message).incrementDelayDeletionCount();

}

// With pre-ack, we ack *before* sending to the client

ref.getQueue().acknowledge(ref, this);

acks++;

}

if (message.isLargeMessage() && this.supportLargeMessage) {

largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, ref);

}

}

pendingDelivery.countUp();

return HandleStatus.HANDLED;

}

}

//......

}

  • ServerConsumerImpl的handle方法会在非browseOnly的情况下会调用ref.incrementDeliveryCount()来增加deliveryCount;必要的时候会执行storageManager.updateDeliveryCount(ref)

updateDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

//......

public void updateDeliveryCount(final MessageReference ref) throws Exception {

// no need to store if it"s the same value

// otherwise the journal will get OME in case of lots of redeliveries

if (ref.getDeliveryCount() == ref.getPersistedCount()) {

return;

}

ref.setPersistedCount(ref.getDeliveryCount());

DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

readLock();

try {

messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));

} finally {

readUnLock();

}

}

//......

}

  • AbstractJournalStorageManager的updateDeliveryCount方法会更新persistedCount到storage

小结

AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10;QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress;sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

doc

  • QueueImpl

以上是 聊聊artemis的maxDeliveryAttempts 的全部内容, 来源链接: utcz.com/z/513021.html

回到顶部