purge rabbitmq queue using spring amqp template?

I am adding messages to rabbitmq queue using spring amqp template in my spring batch item writer.

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;
    BlockingQueue<Object> blockingQueue;


    public void onMessage(Object msgContent) {

        try {
            blockingQueue.put(msgContent);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {

            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);

        }

        for (T item : items) {

            Object msg = blockingQueue.poll(60, TimeUnit.SECONDS);

            if (msg instanceof Exception) {
                throw (Exception) msg;
            } else if (msg == null) {
                System.out.println("reply timeout...");
                break;
            } 

        }
    }

}

Messages are going to be processed on different remote servers. I am trying to handle the use case where if my message processing is failed (due to some exception) the step execution will be stopped.

I want to purge all the remaining messages in that queue so that remaining messages in queue should not be consumed and processed as they will also be failed.

If the step is failed, my item writer will again queue all the messages, so I need to purge all remaining message on any exception.

How can I purge the queue using spring amqp ?


I could do it using

admin.purgeQueue(this.queue, true);


I would use RabbitAdmin instead

http://docs.spring.io/autorepo/docs/spring-amqp-dist/1.3.4.RELEASE/api/org/springframework/amqp/rabbit/core/RabbitAdmin.html#purgeQueue%28java.lang.String,%20boolean%29

@Autowired private RabbitAdmin admin;

...

admin.purgeQueue("queueName", false);


you can use

AMQP.Queue.PurgeOk queuePurge(java.lang.String queue)

"See queuePurge:

http://www.rabbitmq.com/amqp-0-9-1-quickref.html#queue.purge"

链接地址: http://www.djcxy.com/p/59646.html

上一篇: 如何使用Spring AMQP从RabbitMQ接收消息

下一篇: 使用spring amqp模板清除rabbitmq队列?