How can I recover unacknowledged AMQP messages from other channels than my connection's own?
It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)
import pika
credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,
credentials=credentials, virtual_host='***')
def handle_delivery(body):
"""Called when we receive a message from RabbitMQ"""
print body
def on_connected(connection):
"""Called when we are fully connected to RabbitMQ"""
connection.channel(on_channel_open)
def on_channel_open(new_channel):
"""Called when our channel has opened"""
global channel
channel = new_channel
channel.basic_recover(callback=handle_delivery,requeue=True)
try:
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_connected)
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.
Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.
If messages are unacked there are only two ways to get them back into the queue:
basic.nack
This command will cause the message to be placed back into the queue and redelivered.
Disconnect from the broker
This action will force all unacked messages from this channel to be put back into the queue.
NOTE : basic.recover will try to republish unacked messages on the same channel (to the same consumer), which is sometimes the desired behaviour.
RabbitMQ spec for basic.recover and basic.nack
The real question is: Why are the messages unacknowledged?
Possible scenarios:
Consumer fetching too many messages, then not processing and acking them quickly enough.
Solution: Prefetch as few messages as appropriate.
Buggy client library (I have this issue currently with pika 0.9.13 . If the queue has a lot of messages, a certain number of messages will get stuck unacked, even hours later.
Solution: I have to restart the consumer several times until all unacked messages are gone from the queue.
All the unacknowledged messages will go to ready state once all the workers/consumers are stopped.
Ensure all workers are stopped by confirming with a grep
on ps aux
output, and stopping/killing them if found.
If you are managing workers using supervisor, which shows as worker is stopped, you may want to check for zombies. Supervisor reports the worker to be stopped but still you will find zombie processes running when grepped on ps aux output. Killing the zombie processes will bring messages back to ready state.
链接地址: http://www.djcxy.com/p/34096.html上一篇: 关闭空闲/悬空通道