【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗?

利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?

       @Bean

SimpleRabbitListenerContainerFactory lowLoadRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()

factory.connectionFactory = connectionFactory

factory.concurrentConsumers = 1

factory.maxConcurrentConsumers = 1

factory.recoveryInterval = 1000L

factory.setAdviceChain(retryOperationsInterceptor())

return factory

}

@Bean

RetryOperationsInterceptor retryOperationsInterceptor() {

RetryTemplate retryTemplate = new RetryTemplate()

RetryPolicy retryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE)

retryPolicy.setMaxAttempts(5)

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy()

backOffPolicy.setInitialInterval(60000)

backOffPolicy.setMultiplier(2)

backOffPolicy.setMaxInterval(3600000)

retryTemplate.setRetryPolicy(retryPolicy)

retryTemplate.setBackOffPolicy(backOffPolicy)

retryTemplate.registerListener(new RetryListener() {

@Override

<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {

return true

}

@Override

<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

if (throwable != null) {

log.error("Failed: Retry count " + context.getRetryCount(), throwable)

}

}

@Override

<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {

log.error("Retry count " + context.getRetryCount(), throwable)

}

})

RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()

.retryOperations(retryTemplate)

.build()

return interceptor

}

回答

会重新发布的,不会阻塞后面的

通过看spring-amqp的源码知道。这个职责是委托给MessageRecoverer来完成的。

package org.springframework.amqp.rabbit.retry;

import org.springframework.amqp.core.Message;

/**

* @author Dave Syer

*

*/

public interface MessageRecoverer {

/**

* Callback for message that was consumed but failed all retry attempts.

*

* @param message the message to recover

* @param cause the cause of the error

*/

void recover(Message message, Throwable cause);

}

【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗?

这个接口在spring-amqp中有两个实现,一个是 RejectAndDontRequeueRecovererRepublishMessageRecoverer,见名知意,有两个策略。前一种是放入死信队列,后一种是重新发布。更详细的机制看这两个实现类的注释会更加的清晰。

【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗?

--- RepublishMessageRecoverer

@Override

public void recover(Message message, Throwable cause) {

Map<String, Object> headers = message.getMessageProperties().getHeaders();

headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));

headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());

headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());

headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());

Map<? extends String, ? extends Object> additionalHeaders = additionalHeaders(message, cause);

if (additionalHeaders != null) {

headers.putAll(additionalHeaders);

}

if (null != this.errorExchangeName) {

String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message);

this.errorTemplate.send(this.errorExchangeName, routingKey, message);

if (this.logger.isWarnEnabled()) {

this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName);

}

}

else {

final String routingKey = this.prefixedOriginalRoutingKey(message);

this.errorTemplate.send(routingKey, message);

if (this.logger.isWarnEnabled()) {

this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey);

}

}

}

以上是 【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗? 的全部内容, 来源链接: utcz.com/a/75341.html

回到顶部