我怎样才能从连接本身以外的其他通道恢复未确认的AMQP消息?

看起来我保持rabbitmq服务器运行的时间越长,对于未经确认的消息而言,我遇到的麻烦就越多。 我很乐意要求他们。 事实上,似乎有一个amqp命令来执行此操作,但它只适用于您的连接正在使用的通道。 我建立了一个小小的pika脚本,至少可以试用它,但是我要么丢失了一些东西,要么不能这样做(如何使用rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

未确认的消息是通过网络传递给消费者但尚未被确认或拒绝的消息,但该消费者尚未关闭其最初接收它们的频道或连接。 因此,经纪人无法弄清楚消费者是否花了很长时间来处理这些消息,或者是否已经忘记了这些消息。 因此,它将使他们处于未被承认的状态,直到消费者死亡或被拒绝或拒绝为止。

由于那些最初消费它们的仍然活着的消费者仍然可以对这些消息进行有效处理,因此不能(据我所知)将另一个消费者插入混合体中,并尝试对它们做出外部决定。 您需要修复您的消费者,以便在处理每条消息时作出决定,而不是让旧消息未被确认。


如果消息没有被使用,只有两种方法可以让他们回到队列中:

  • basic.nack

    该命令将导致消息放回队列并重新发送。

  • 断开经纪人

    这个动作会强制所有来自这个频道的未被删除的消息被放回到队列中。

  • 注意 :basic.recover会尝试在相同的频道(对同一个消费者)重新发布未被消息的消息,这有时是期望的行为。

    RabbitMQ规范basic.recover和basic.nack


    真正的问题是:为什么这些消息未被确认?

    可能的情况:

  • 消费者获取太多的消息,然后不够快速地处理和确认它们。

    解决方案:根据需要预取尽可能少的消息。

  • Buggy客户端库(我现在有这个问题,使用pika 0.9.13 ,如果队列中有很多消息,一定数量的消息会在几个小时后被取消。

    解决方案:我必须重新启动消费者几次,直到所有未消息消息都从队列中消失。


  • 所有工作人员/消费者停止后,所有未确认的消息将进入就绪状态。

    通过ps aux输出确认grep来确保所有工人停止工作,如果找到,则停止/消除他们。

    如果您正在使用主管来管理工作人员,这表明工作人员已停止工作,您可能需要检查僵尸。 主管报告工作人员被停止,但仍然会在ps aux输出时发现僵尸进程正在运行。 杀死僵尸进程将使消息回到就绪状态。

    链接地址: http://www.djcxy.com/p/34095.html

    上一篇: How can I recover unacknowledged AMQP messages from other channels than my connection's own?

    下一篇: Dumb pipes smart endpoints