AMQP gem for rails is requeuing hundreds of successfully processed messages

Why, when I have a lot of messages on a queue (1200), are my messages being requeued even though my code processes them successfully and "acks" them?

AND

How can I fix this?

..

I have an application that uses rails amqp gem to make use of RabbitMQ. We put messages on a queue with information about emails that need to be sent, and the subscriber takes them off and sends them.

Sometimes hundreds of messages will be placed on the queue in quick succession.

We use acknowledgements to be sure that messages are not lost.

It was working very well until recently when I found that there were 1200 messages on the queue and they were not being consumed.

So why was my consumer not consuming them?

Looking at the logs I found that yes, it had consumed them and the emails were sent. I restarted the consumer and it reconsumed them, meaning we sent multiples of the same email to users. Yikes! But what I noticed by watching the RabbitMQ UI was that when I restarted the consumer, it took all 1200 messages off the queue at once. Then after a few minutes, these messages were requeued, even though my consumer was still going through them and sending the emails. In our code, the consumer does ack the message after each email is sent (message processed).

So my best guess at what is happening is that when there are lots of messages on the queue, the consumer takes them all off, but does not ack each one separately and instead waits until all the messages have been processed before doing a mass ack. As this takes a long time, 10 minutes, something is happening on the RabbitMQ side which says, hey this is taking too long, lets requeue all those messages, even while my consumer is still processing them successfully.

I have looked around a lot and found something called a heartbeat, but I cannot find any clear explanation of what this is and how to use it, if I need to use it at all. But it sounds like it could be related to communication between the queue and the consumer and could be the key to not having all those messages requeued while they are being processed.

Another thing I tried was using prefetch: 1. Described here . Though it does not seem appropriate because I only have one consumer. But it sounded hopeful because it looked as though it might force one by one acknowledgement of messages.

Should I consider multiple consumers given that we could get hundreds of messages placed on the queue in quick succession?

Here is my rake task to subscribe to the queue

task :subscribe_basic => :environment do |task_name|
  begin # make sure any exception is logged
    log = Rails.logger
    routing_key = "send_letter"
    tcp_connection_settings =
        {:host=>"localhost",
         :port=>5672,
         :vhost=>"dev_vhost",
         :user=>"dev_user",
         :pass=>"abc123",
         :timeout=>0.3,
         :ssl=>false,
         :on_tcp_connection_loss=>
             handle_conn_loss,
         :logging=>true}

    begin
      ::AMQP.start(tcp_connection_settings) do |connection|
        channel = ::AMQP::Channel.new(connection, :prefetch => 1)
        binding.pry
        channel.auto_recovery = true
        cons = SendLetterConsumer.new channel, log

        queue = channel.queue(routing_key, exclusive: false, durable: true)

        consumer1 = AMQP::Consumer.new(channel, queue, nil, exclusive = false, no_ack = false)
        consumer1.consume.on_delivery(&cons.method(:handle_message))
        log.info "subscribed to queue #{routing_key}, config_key #{config_key} (#{Process.pid})"

        Signal.trap 'INT' do # kill -s INT <pid> , kill -2 <pid>,  Ctrl+C
          log.info "#{task_name} stopping(#{Process.pid})..."
          channel.close { EventMachine.stop } # otherwise segfault
        end
      end
    rescue StandardError => ex
      # 2015-03-20 02:52:49 UTC MQ raised EventMachine::ConnectionError: unable to resolve server address
      log.error "MQ raised #{ex.class.name}: #{ex.message} Backtrace: #{ex.backtrace}"
    end
  rescue Exception => ex
    log.error "#{ex.class.name}: #{ex.message} -- #{ex.backtrace.inspect}"
    raise ex
  end

end

Here is the consumer code we use to handle the message (called in above code: consumer1.consume.on_delivery(&cons.method(:handle_message)) ) :

def handle_message(metadata, payload)
  logger.info "*** SendLetterConsumer#handle_message start #{Time.now}"
  logger.info payload
  begin
    # {course_app: aCourseApplication, errors:[]}
    # {course_app: aFaultyCourseApplication, errors: ['error1', 'error2']}
    msg = JSON.parse(payload)
    ca = CourseApplication.find(msg['course_application_id'])
    am = AutomatedMessage.find(msg['automated_message_id'])
    user_name = msg['user_name']
    if am.present?
      raise "Cannot send a letter for Automated message with id #{am.id} because it does not have an associated message template" if am.message_template.nil?
      logger.info "attempt to send letter for Automated Message with id #{am.id}"
      result = LetterSender::send_letter a_course_application: ca, a_message_template: am.message_template, user_name: user_name
    elsif msg.message_template_id
      mt = MessageTemplate.find(msg.message_template_id)
      result = LetterSender::send_letter a_course_application: ca, a_message_template: mt, user_name: user_name
    end
    if result
      metadata.ack #'ack'-ing will remove the message from the queue - do this even if we created a faultyCourseApp
    else
      logger.error "Could not ack for #{msg}"
    end
  rescue StandardError => e
    logger.error "#{e.message} #{e.backtrace}"
    # do not 'ack' - must be a programming mistake so leave message on queue - keep connection open to cont processing other messages
    # fix bug and restart the rake task to redeliver the unacknowledged messages
  end
  logger.info "*** SendLetterConsumer#handle_message   end #{Time.now}"
end    

prefetch was indeed the answer but the doc I linked to above regarding this says to configure this by using:

channel = AMQP::Channel.new(connection, :prefetch => 1)

but this did not work at all.

I had to do this

channel    = AMQP::Channel.new(connection)
channel.prefetch(1)

and now it works, dispatching only one message and waiting til it is acked before the next is dispatched.

This solution is described here in the rabbitmq tutorial, not the amqp gem.

So what happens if I have only one consumer with prefetch, and it fails to ack a message. Will messages start piling up?

YES

So it may be good to have 2 consumers, but then both of those consumers might fail to ack.

To deal with this, I am trying reject and requeue. So in my Consumer, if I do not hit the section of code where I ack the message, I use metadata.reject(:requeue=>true) and this puts the message back on the front of the queue. Yes, that's right, the "front" of the queue - bummer. This means messages will still pile up as the same failing message is continually dispatched to the one consumer.

as the former link above says "When there is only one consumer on a queue, make sure you do not create infinite message delivery loops by rejecting and requeueing a message from the same consumer over and over again."

Why doesn't requeue put it on the end of the queue? Wouldn't that be better? Still you would get looping messages but at least the new messages would get processed rather than pile up.

So I tried setting the prefetch to more than one... two. But same problem. as soon as 2 messages are rejected and requeued, my poor old consumer keeps getting these same ones delivered to it, rather than getting ones that it has not rejected to give it a chance to process the backlog of messages.

How about multiple consumers? same problem. I have 2 consumers that prefetch x messages and metadata.reject(requeue:true) them if something goes wrong. Now if the front 2x messages are causing errors in my consumer, then I get into the same problem of infinite looping messages with messages backing up. If there are less than 2x messages which consistently fail to be acked on the front of the queue, then the consumers gradually get through the backlog of messages.

It seems there is no satisfactory solution.

Ideally I would like my prefetching consumers (prefetch necessary due to initial problem) to be able to not ack a message that they fail to consume properly, but to also move onto the next message in the queue. In other words, leave the bad ones in the unacknowledged messages collection rather than put them back on the queue. Problem is that with prefetch I have to reject them or else everything stops and I have to requeue them or else I lose them.

One approach might be: in my consumer, when a redelivered message fails to be consumed properly in the code, I will reject it but not requeue it by using metadata.reject() and somehow report this message to a developer, or save it in a failed message table in the db so we can deal with it then. (re redelivered flag metadata.redelivered see here in the "At The Consumer" section)

It would be wonderful if rabbitmq provided a redelivery count - so I could make the cut off for not requeuing higher but it does not seem to do so, it only provides a redelivered flag.


My other answer said that prefetch works to solve the problem, but introduces a new problem, ie with prefetch one must then reject and requeue messages that are failing and this leads to loops due to the fact that reject(requeue:true) puts it on the front of the queue only to be consumed again. Multiple consumers helps a bit but you can still get into loops.

So in order to use prefetch but put failing messages on the back of the queue, I have found that using a dead-letter-exchange setup works. See this article about it, though it is for C#, but you can see the general idea. Also see RabbitMQ doc about Dead Letter Exchanges.

I did not grok it at first so here is my short explanation on using the dead letter exchange for this situation:

RabbitMq does not do delayed messages, so the idea is to use a retry queue and publish messages that fail in the consumer onto this retry queue. In turn this retry queue will kill them after a certain time causing them to be put on the end of the main queue.

  • consumer tries to consume message.

  • something goes wrong, or you catch an error, so you do not ack ( metadata.ack ) the message but metadata.reject(requeue:false) and publish to the retry queue.

  • With a dead letter exchange configuration for this retry queue what happens is this:

  • The message sits on the retry queue for time period x (set when creating the retry queue in argument "x-message-ttl" see below) then RabbitMq kills it.

  • due to the dead letter exchange setup configured on the retry queue using arguments "x-dead-letter-exchange" and "x-dead-letter-routing-key" (see below) this message automatically goes back onto the back of the main queue.

  • A great thing about this is that the retry queue does not even need any consumers.

    Here is some code I put in my consumer to publish to the retry queue

    def publish_to_retry_queue(msg:, metadata:)
      @channel.queue("send_letter.retry", exclusive: false, persistent: true, durable: true,
                     arguments:{"x-dead-letter-exchange" => "dead_letter_exchange",
                                "x-dead-letter-routing-key" => "send_letter",
                                "x-message-ttl" => 25000})
      metadata.reject(requeue: false)
      res = @channel.default_exchange.publish(msg, routing_key: "send_letter.retry", headers: metadata.headers)
      @logger.info "result from publishing to retry queue is"
      @logger.info  res
      res
    end
    

    where @channel is the channel that the consumer from the main queue is using. NOTE this requires that you have already setup the exchange called dead_letter_exchange on rabbitmq and added a binding from it to the main queue, in this case it is the send_letter queue.

    链接地址: http://www.djcxy.com/p/34074.html

    上一篇: 时间消息/直通队列

    下一篇: AMQP for rails的gem正在重新处理数百个成功处理的消息