AMQP for rails的gem正在重新处理数百个成功处理的消息

为什么当我在队列(1200)上有很多消息时,即使我的代码成功处理它们并对它们进行“确认”,我的消息是否还要重新排序呢?

我怎样才能解决这个问题?

..

我有一个应用程序使用rails amqp gem来使用RabbitMQ。 我们将消息放在队列中,并提供有关需要发送的电子邮件的信息,然后用户将它们发送出去并发送出去。

有时会有数百封邮件快速连续放置在队列中。

我们使用确认来确保消息不会丢失。

直到最近,当我发现队列中有1200条消息并且它们没有被使用时,它工作得很好。

那么为什么我的消费者不会消费他们?

看着日志,我发现是的,它已经消耗了它们,并发送了电子邮件。 我重新启动了消费者,并重新启用了它们,这意味着我们将多个相同的电子邮件发送给用户。 哎呀! 但是我通过观看RabbitMQ UI发现的情况是,当我重新启动消费者时,它立即将所有1200条消息从队列中取出。 几分钟后,即使我的消费者仍在通过他们并发送电子邮件,这些消息也会被重新发送。 在我们的代码中,消费者在发送每封电子邮件(消息处理)之后都确认消息。

所以我最好猜测发生的情况是,当队列中有很多消息时,消费者会将它们全部关闭,但不会单独对每个消息进行确认,而是等待所有消息都已被处理,然后再进行批量确认。 由于这需要很长时间,10分钟,RabbitMQ方面正在发生一些事情,即嘿,这需要很长的时间,即使在我的消费者仍在成功处理它们的时候,也要让所有这些消息重新发送。

我环顾四周,发现了一种叫做心跳的东西,但是,如果我需要使用它,我无法找到任何清楚的解释,说明这是什么以及如何使用它。 但它听起来像是可能与队列和消费者之间的通信有关,并且可能是在处理它们时不会处理所有这些消息的关键。

我尝试的另一件事是使用预取:1.在这里描述。 虽然这看起来不合适,因为我只有一个消费者。 但它听起来很有希望,因为它看起来好像它可能会逐一强制确认消息。

考虑到我们可以快速连续地将数百条消息放入队列中,我是否应该考虑多个消费者?

这是我订阅队列的rake任务

task :subscribe_basic => :environment do |task_name|
  begin # make sure any exception is logged
    log = Rails.logger
    routing_key = "send_letter"
    tcp_connection_settings =
        {:host=>"localhost",
         :port=>5672,
         :vhost=>"dev_vhost",
         :user=>"dev_user",
         :pass=>"abc123",
         :timeout=>0.3,
         :ssl=>false,
         :on_tcp_connection_loss=>
             handle_conn_loss,
         :logging=>true}

    begin
      ::AMQP.start(tcp_connection_settings) do |connection|
        channel = ::AMQP::Channel.new(connection, :prefetch => 1)
        binding.pry
        channel.auto_recovery = true
        cons = SendLetterConsumer.new channel, log

        queue = channel.queue(routing_key, exclusive: false, durable: true)

        consumer1 = AMQP::Consumer.new(channel, queue, nil, exclusive = false, no_ack = false)
        consumer1.consume.on_delivery(&cons.method(:handle_message))
        log.info "subscribed to queue #{routing_key}, config_key #{config_key} (#{Process.pid})"

        Signal.trap 'INT' do # kill -s INT <pid> , kill -2 <pid>,  Ctrl+C
          log.info "#{task_name} stopping(#{Process.pid})..."
          channel.close { EventMachine.stop } # otherwise segfault
        end
      end
    rescue StandardError => ex
      # 2015-03-20 02:52:49 UTC MQ raised EventMachine::ConnectionError: unable to resolve server address
      log.error "MQ raised #{ex.class.name}: #{ex.message} Backtrace: #{ex.backtrace}"
    end
  rescue Exception => ex
    log.error "#{ex.class.name}: #{ex.message} -- #{ex.backtrace.inspect}"
    raise ex
  end

end

这里是我们用来处理消息的消费者代码(在上面的代码中调用: consumer1.consume.on_delivery(&cons.method(:handle_message)) ):

def handle_message(metadata, payload)
  logger.info "*** SendLetterConsumer#handle_message start #{Time.now}"
  logger.info payload
  begin
    # {course_app: aCourseApplication, errors:[]}
    # {course_app: aFaultyCourseApplication, errors: ['error1', 'error2']}
    msg = JSON.parse(payload)
    ca = CourseApplication.find(msg['course_application_id'])
    am = AutomatedMessage.find(msg['automated_message_id'])
    user_name = msg['user_name']
    if am.present?
      raise "Cannot send a letter for Automated message with id #{am.id} because it does not have an associated message template" if am.message_template.nil?
      logger.info "attempt to send letter for Automated Message with id #{am.id}"
      result = LetterSender::send_letter a_course_application: ca, a_message_template: am.message_template, user_name: user_name
    elsif msg.message_template_id
      mt = MessageTemplate.find(msg.message_template_id)
      result = LetterSender::send_letter a_course_application: ca, a_message_template: mt, user_name: user_name
    end
    if result
      metadata.ack #'ack'-ing will remove the message from the queue - do this even if we created a faultyCourseApp
    else
      logger.error "Could not ack for #{msg}"
    end
  rescue StandardError => e
    logger.error "#{e.message} #{e.backtrace}"
    # do not 'ack' - must be a programming mistake so leave message on queue - keep connection open to cont processing other messages
    # fix bug and restart the rake task to redeliver the unacknowledged messages
  end
  logger.info "*** SendLetterConsumer#handle_message   end #{Time.now}"
end    

预取实际上是答案,但我与上面关于这个的文档说这是通过使用:

channel = AMQP::Channel.new(connection, :prefetch => 1)

但这根本不起作用。

我必须这样做

channel    = AMQP::Channel.new(connection)
channel.prefetch(1)

现在它可以工作,只发送一条消息,等到发送下一条消息之前发送消息。

此解决方案在rabbitmq教程中描述,而不是amqp gem。

那么如果我只有一个使用者进行预取,并且它不能回复消息,会发生什么。 信息会不会堆积起来?

因此,有2个消费者可能会很好,但这两个消费者可能都不会感到失望。

为了解决这个问题,我正在尝试拒绝并要求。 因此,在我的消费者中,如果我没有点击消息中的代码段,则使用metadata.reject(:requeue=>true) ,这会将消息放回队列的前端。 是的,没错,队列的“前沿” - 无赖。 这意味着消息仍然会堆积起来,因为同样的失败消息会不断发送给一个消费者。

正如前面的链接所述:“当队列上只有一个消费者时,确保您不会通过一次又一次地拒绝和重新发送来自同一消费者的消息来创建无限的消息传递循环。”

为什么不要把它放在队列的尽头? 这不会更好吗? 你仍然会得到循环消息,但至少新消息将被处理而不是堆积起来。

所以我试着将预取设置为不止一个......两个。 但同样的问题。 一旦有2封邮件被拒绝并重新发送,我可怜的老顾客就会一直收到这些邮件,而不是拒绝那些没有拒绝的邮件,以便让它有机会处理积压的邮件。

多个消费者如何? 同样的问题。 如果出现问题,我有2个消费者预取x消息和metadata.reject(requeue:true) 。 现在,如果前面的2x消息在我的消费者中造成错误,那么我会遇到与消息备份无限循环消息相同的问题。 如果少于2x的消息始终未能在队列的前面被消除,则消费者将逐渐处理消息的积压。

似乎没有令人满意的解决方案。

理想情况下,我希望预取消费者(由于最初的问题需要预取)能够不能确认消息未正确使用,但也要移至队列中的下一条消息。 换句话说,把坏的留在未确认的消息集合中,而不是放回队列中。 问题在于,通过预取,我必须拒绝它们,否则一切都会停止,我必须重新排队,否则我会丢失它们。

一种方法可能是:在我的消费者中,当代码中没有正确使用重新传递的消息时,我会拒绝它,但不会通过使用metadata.reject()来拒绝它,并以某种方式将此消息报告给开发人员,或将其保存在db中失败的消息表,所以我们可以处理它。 (重新发布的标志metadata.redelivered请参阅“在消费者”部分)

如果rabbitmq提供了重新交付计数,那将是非常好的 - 所以我可以在不重新排队的情况下进行中断,但似乎并没有这样做,只是提供了重新输入的标志。


我的另一个回答说,预取有效地解决了这个问题,但是引入了一个新的问题,即带有预取,然后必须拒绝并重新发送失败的消息,并且由于reject(requeue:true)将其置于队列的前面只能再次消耗。 多个消费者有点帮助,但你仍然可以进入循环。

因此,为了使用预取,但将失败的消息放在队列的后面,我发现使用死信交换设置工作。 看到这篇关于它的文章,尽管它是针对C#的,但你可以看到一般的想法。 另请参阅关于死信交换的RabbitMQ文档。

我一开始并没有这么想,所以这里是我在这种情况下使用死信交换的简短解释:

RabbitMq不会执行延迟消息,因此我们的想法是使用重试队列并将消费者失败的消息发布到此重试队列中。 反过来,这个重试队列会在一段时间后终止它们,导致它们被放在主队列的末尾。

  • 消费者试图消费信息。

  • 出现错误,或者您发现错误,因此您不会确认( metadata.ack )消息,而是发现metadata.reject(requeue:false)并将其发布到重试队列中。

  • 使用此重试队列的死信交换配置会发生什么情况:

  • 消息位于时间段x的重试队列中(在参数“x-message-ttl”中创建重试队列时设置),然后RabbitMq杀死它。

  • 由于在重试队列中使用参数“x-dead-letter-exchange”和“x-dead-letter-routing-key”(见下文)配置了死信交换设置,该消息自动返回到主队列。

  • 关于这一点很棒的是重试队列甚至不需要任何消费者。

    以下是我放入消费者发布到重试队列的一些代码

    def publish_to_retry_queue(msg:, metadata:)
      @channel.queue("send_letter.retry", exclusive: false, persistent: true, durable: true,
                     arguments:{"x-dead-letter-exchange" => "dead_letter_exchange",
                                "x-dead-letter-routing-key" => "send_letter",
                                "x-message-ttl" => 25000})
      metadata.reject(requeue: false)
      res = @channel.default_exchange.publish(msg, routing_key: "send_letter.retry", headers: metadata.headers)
      @logger.info "result from publishing to retry queue is"
      @logger.info  res
      res
    end
    

    其中@channel是来自主队列的消费者正在使用的渠道。 注意,这要求您已经在rabbitmq上设置了名为dead_letter_exchange的交换,并在其中添加了一个到主队列的绑定,在这种情况下,它是send_letter队列。

    链接地址: http://www.djcxy.com/p/34073.html

    上一篇: AMQP gem for rails is requeuing hundreds of successfully processed messages

    下一篇: How delete messages from an AMQP (RabbitMQ) queue?