【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗?
利用RetryOperationsInterceptor做重试机制,假如某个消息抛异常重试,会导致这个消息之后的来的消息一直被阻塞吗?还是这个重试的消息被插到队尾,让其他消息消费?
@BeanSimpleRabbitListenerContainerFactory lowLoadRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
factory.connectionFactory = connectionFactory
factory.concurrentConsumers = 1
factory.maxConcurrentConsumers = 1
factory.recoveryInterval = 1000L
factory.setAdviceChain(retryOperationsInterceptor())
return factory
}
@Bean
RetryOperationsInterceptor retryOperationsInterceptor() {
RetryTemplate retryTemplate = new RetryTemplate()
RetryPolicy retryPolicy = new SimpleRetryPolicy(Integer.MAX_VALUE)
retryPolicy.setMaxAttempts(5)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy()
backOffPolicy.setInitialInterval(60000)
backOffPolicy.setMultiplier(2)
backOffPolicy.setMaxInterval(3600000)
retryTemplate.setRetryPolicy(retryPolicy)
retryTemplate.setBackOffPolicy(backOffPolicy)
retryTemplate.registerListener(new RetryListener() {
@Override
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true
}
@Override
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
if (throwable != null) {
log.error("Failed: Retry count " + context.getRetryCount(), throwable)
}
}
@Override
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Retry count " + context.getRetryCount(), throwable)
}
})
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate)
.build()
return interceptor
}
回答
会重新发布的,不会阻塞后面的
通过看spring-amqp
的源码知道。这个职责是委托给MessageRecoverer
来完成的。
package org.springframework.amqp.rabbit.retry;import org.springframework.amqp.core.Message;
/**
* @author Dave Syer
*
*/
public interface MessageRecoverer {
/**
* Callback for message that was consumed but failed all retry attempts.
*
* @param message the message to recover
* @param cause the cause of the error
*/
void recover(Message message, Throwable cause);
}
这个接口在spring-amqp
中有两个实现,一个是 RejectAndDontRequeueRecoverer
和 RepublishMessageRecoverer
,见名知意,有两个策略。前一种是放入死信队列,后一种是重新发布。更详细的机制看这两个实现类的注释会更加的清晰。
--- RepublishMessageRecoverer @Override
public void recover(Message message, Throwable cause) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
Map<? extends String, ? extends Object> additionalHeaders = additionalHeaders(message, cause);
if (additionalHeaders != null) {
headers.putAll(additionalHeaders);
}
if (null != this.errorExchangeName) {
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message);
this.errorTemplate.send(this.errorExchangeName, routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName);
}
}
else {
final String routingKey = this.prefixedOriginalRoutingKey(message);
this.errorTemplate.send(routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey);
}
}
}
以上是 【java】rabbitmq某消息重试会导致其后到的消息被阻塞吗? 的全部内容, 来源链接: utcz.com/a/75341.html