Logging exceptions thrown by message listeners for Spring AMQP
I have an application which handles messages using an async consumer (via @RabbitListener). Inside this consumer method, an exception occurs and the message is requeued due to the policies I've defined:
spring:
rabbitmq:
listener:
simple:
default-requeue-rejected: false
retry:
enabled: true
max-attempts: 10
initial-interval: 60000 # a minute
multiplier: 2
max-interval: 600000 # 10 minutes
The consumer method calls a private method which recursively fetches data from the DB and pushes into a queue using RabbitTemplate. I expect around 200 messages in this queue however it goes up to around 700k and then the consumer thread stops due to the retry policy exhaustion.
The problem is that I cannot find any place to log the exception and therefore I cannot understand which part of the business logic causes this issue. I may try placing the whole function into a try/catch block and log the issue before rethrowing it for Spring AMQP's exception handling but I want to know whether a better approach exists.
My project has the following dependencies:
Spring Boot: 1.5.9.RELEASE
Spring AMQP: 1.7.4.RELEASE
RabbitMQ: 3.7.2
We should probably try to make it a bit easier to add a RetryListener
, but you can do it now, by replacing the retry interceptor as follows...
@SpringBootApplication
public class So48331502Application {
private static final Logger logger = LoggerFactory.getLogger(So48331502Application.class);
public static void main(String[] args) {
SpringApplication.run(So48331502Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitListenerEndpointRegistry registry,
RabbitProperties properties, Advice interceptor) {
return args -> {
ListenerRetry retry = properties.getListener().getSimple().getRetry();
if (retry.isEnabled()) {
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) registry
.getListenerContainer("myListener");
container.setAdviceChain(interceptor);
}
registry.start();
};
}
@Bean
public StatelessRetryOperationsInterceptorFactoryBean interceptor(RabbitProperties properties) {
ListenerRetry retry = properties.getListener().getSimple().getRetry();
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(retry.getMaxAttempts());
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retry.getInitialInterval());
backOffPolicy.setMultiplier(retry.getMultiplier());
backOffPolicy.setMaxInterval(retry.getMaxInterval());
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(
new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
if (throwable != null) {
logger.info("Failed: Retry count " + context.getRetryCount(), throwable);
}
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
logger.info("Retry count " + context.getRetryCount(), throwable);
}
});
StatelessRetryOperationsInterceptorFactoryBean interceptor =
new StatelessRetryOperationsInterceptorFactoryBean();
interceptor.setRetryOperations(retryTemplate);
return interceptor;
}
@RabbitListener(id="myListener", queues = "one")
public void in(Object in) {
throw new RuntimeException();
}
}
Note that you have to set auto-startup
to false so you can change the advice chain...
spring:
rabbitmq:
listener:
simple:
auto-startup: 'false'
retry:
enabled: 'true'
then start the registry, which will start all the containers.
链接地址: http://www.djcxy.com/p/59652.html