Spring AMQP: would like to put message to queue and send ACK immediately
I wrote Java application, that sends messages to RabbitMQ. Then Flume picks messages up from RabbitMQ queue. I'm interested that nobody pulls messages from the queue, except flume.
My application uses Spring AMQP Java plugin.
The problem:
With the code below, message comes to RabbitMQ queue and stays 'Unknowledges' for ever. As I understand, RabbitMQ is waiting for ACK from MessageListener, but MessageListener will never ACK. Does anybody have idea how to fix it?
The code:
public class MyAmqpConfiguration {
@Autowired
ConnectionFactory connectionFactory;
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(activityLogsQueue());
container.setMessageListener(MyMessageListener());
container.setConcurrentConsumers(3);
return container;
}
@Bean(name="myTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(MyMessageConverter());
return template;
}
}
public class MyMessageListener implements MessageListener {
public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) {
this.converter = converter;
this.messageHandler = messageHandler;
}
@Override
public void onMessage(Message message) {
this.messageHandler.doThings();
}
}
public class MyMessageHandler {
@Autowired
@Qualifier("myTemplate")
RabbitTemplate template;
@Override
public void handleMessage(MyObject thing) {
template.convertAndSend(exchange, routingKey, thing);
}
}
public class MyMessageConverter extends JsonMessageConverter {
@Override
protected Message createMessage(Object object, MessageProperties messageProperties) {
//do things
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new UnsupportedOperationException("fromMessage is not supported in "+this.getClass().getName());
}
}
If you don't want to have to ACK each message then you can set the AcknowledgeMode on the SimpleMessageListenerContainer by doing
container.setAcknowledgeMode(AcknowledgeMode.NONE);
take a look at the API reference for more info.
Update: Should be AcknowledgeMode.NONE
Set to AcknowledgeMode.NONE to tell the broker not to expect any acknowledgements, and it will assume all messages are acknowledged as soon as they are sent (this is "autoack" in native Rabbit broker terms). If AcknowledgeMode.NONE then the channel cannot be transactional (so the container will fail on start up if that flag is accidentally set).
Here is discussion that leaded to read solution:
http://forum.springsource.org/showthread.php?129304-Spring-AMQP-would-like-to-put-message-to-queue-and-send-ACK-immediately&p=422064&posted=1#post422064
链接地址: http://www.djcxy.com/p/59642.html上一篇: 不同的死亡